diff --git a/packages/actor-core/src/actor/action.ts b/packages/actor-core/src/actor/action.ts index ce528e1d6..ff530b741 100644 --- a/packages/actor-core/src/actor/action.ts +++ b/packages/actor-core/src/actor/action.ts @@ -57,6 +57,13 @@ export class ActionContext { return this.#actorContext.log; } + /** + * Gets actor ID. + */ + get actorId(): string { + return this.#actorContext.actorId; + } + /** * Gets the actor name. */ diff --git a/packages/actor-core/src/actor/connection.ts b/packages/actor-core/src/actor/connection.ts index e20955fdb..5831ee5f4 100644 --- a/packages/actor-core/src/actor/connection.ts +++ b/packages/actor-core/src/actor/connection.ts @@ -5,6 +5,7 @@ import { CachedSerializer } from "./protocol/serde"; import type { ConnDriver } from "./driver"; import * as messageToClient from "@/actor/protocol/message/to-client"; import type { PersistedConn } from "./persisted"; +import * as wsToClient from "@/actor/protocol/message/to-client"; export function generateConnId(): string { return crypto.randomUUID(); @@ -135,7 +136,7 @@ export class Conn { */ public send(eventName: string, ...args: unknown[]) { this._sendMessage( - new CachedSerializer({ + new CachedSerializer({ b: { ev: { n: eventName, diff --git a/packages/actor-core/src/actor/context.ts b/packages/actor-core/src/actor/context.ts index d304fbc7b..7b92fea6f 100644 --- a/packages/actor-core/src/actor/context.ts +++ b/packages/actor-core/src/actor/context.ts @@ -5,7 +5,6 @@ import { Conn, ConnId } from "./connection"; import { ActorKey } from "@/common/utils"; import { Schedule } from "./schedule"; - /** * ActorContext class that provides access to actor methods and state */ @@ -36,7 +35,6 @@ export class ActorContext { * @param args - The arguments to send with the event. */ broadcast>(name: string, ...args: Args): void { - // @ts-ignore - Access protected method this.#actor._broadcast(name, ...args); return; } @@ -45,15 +43,20 @@ export class ActorContext { * Gets the logger instance. */ get log(): Logger { - // @ts-ignore - Access protected method return this.#actor.log; } + /** + * Gets actor ID. + */ + get actorId(): string { + return this.#actor.id; + } + /** * Gets the actor name. */ get name(): string { - // @ts-ignore - Access protected method return this.#actor.name; } @@ -61,7 +64,6 @@ export class ActorContext { * Gets the actor key. */ get key(): ActorKey { - // @ts-ignore - Access protected method return this.#actor.key; } @@ -69,7 +71,6 @@ export class ActorContext { * Gets the region. */ get region(): string { - // @ts-ignore - Access protected method return this.#actor.region; } @@ -77,7 +78,6 @@ export class ActorContext { * Gets the scheduler. */ get schedule(): Schedule { - // @ts-ignore - Access protected method return this.#actor.schedule; } @@ -85,7 +85,6 @@ export class ActorContext { * Gets the map of connections. */ get conns(): Map> { - // @ts-ignore - Access protected method return this.#actor.conns; } @@ -95,7 +94,6 @@ export class ActorContext { * @param opts - Options for saving the state. */ async saveState(opts: SaveStateOptions): Promise { - // @ts-ignore - Access protected method return this.#actor.saveState(opts); } @@ -105,7 +103,6 @@ export class ActorContext { * @param promise - The promise to run in the background. */ runInBackground(promise: Promise): void { - // @ts-ignore - Access protected method this.#actor._runInBackground(promise); return; } diff --git a/packages/actor-core/src/actor/instance.ts b/packages/actor-core/src/actor/instance.ts index 168d7994d..bdfece0d9 100644 --- a/packages/actor-core/src/actor/instance.ts +++ b/packages/actor-core/src/actor/instance.ts @@ -15,6 +15,7 @@ import { instanceLogger, logger } from "./log"; import type { ActionContext } from "./action"; import { DeadlineError, Lock, deadline } from "./utils"; import { Schedule } from "./schedule"; +import * as wsToClient from "@/actor/protocol/message/to-client"; import type * as wsToServer from "@/actor/protocol/message/to-server"; import { CachedSerializer } from "./protocol/serde"; import { ActorInspector } from "@/inspector/actor"; @@ -716,7 +717,7 @@ export class ActorInstance { // Send init message conn._sendMessage( - new CachedSerializer({ + new CachedSerializer({ b: { i: { ci: `${conn.id}`, @@ -1019,7 +1020,7 @@ export class ActorInstance { const subscriptions = this.#subscriptionIndex.get(name); if (!subscriptions) return; - const toClientSerializer = new CachedSerializer({ + const toClientSerializer = new CachedSerializer({ b: { ev: { n: name, diff --git a/packages/actor-core/src/actor/protocol/http/error.ts b/packages/actor-core/src/actor/protocol/http/error.ts new file mode 100644 index 000000000..7c7b9216b --- /dev/null +++ b/packages/actor-core/src/actor/protocol/http/error.ts @@ -0,0 +1,12 @@ +import { z } from "zod"; + +export const ResponseErrorSchema = z.object({ + // Code + c: z.string(), + // Message + m: z.string(), + // Metadata + md: z.unknown().optional(), +}); + +export type ResponseError = z.infer; diff --git a/packages/actor-core/src/actor/protocol/http/rpc.ts b/packages/actor-core/src/actor/protocol/http/rpc.ts index 87b6b21b8..79feca156 100644 --- a/packages/actor-core/src/actor/protocol/http/rpc.ts +++ b/packages/actor-core/src/actor/protocol/http/rpc.ts @@ -1,24 +1,15 @@ import { z } from "zod"; -export const RequestSchema = z.object({ +export const RpcRequestSchema = z.object({ // Args a: z.array(z.unknown()), }); -export const ResponseOkSchema = z.object({ +export const RpcResponseSchema = z.object({ // Output o: z.unknown(), }); -export const ResponseErrSchema = z.object({ - // Code - c: z.string(), - // Message - m: z.string(), - // Metadata - md: z.unknown().optional(), -}); -export type Request = z.infer; -export type ResponseOk = z.infer; -export type ResponseErr = z.infer; +export type RpcRequest = z.infer; +export type RpcResponse = z.infer; diff --git a/packages/actor-core/src/actor/protocol/message/mod.ts b/packages/actor-core/src/actor/protocol/message/mod.ts index 00a6e5ca2..bcaba3c49 100644 --- a/packages/actor-core/src/actor/protocol/message/mod.ts +++ b/packages/actor-core/src/actor/protocol/message/mod.ts @@ -126,7 +126,7 @@ export async function processMessage( conn._sendMessage( new CachedSerializer({ b: { - ro: { + rr: { i: id, o: output, }, @@ -179,32 +179,18 @@ export async function processMessage( }); // Build response - if (rpcId !== undefined) { - conn._sendMessage( - new CachedSerializer({ - b: { - re: { - i: rpcId, - c: code, - m: message, - md: metadata, - }, + conn._sendMessage( + new CachedSerializer({ + b: { + e: { + c: code, + m: message, + md: metadata, + ri: rpcId, }, - }), - ); - } else { - conn._sendMessage( - new CachedSerializer({ - b: { - er: { - c: code, - m: message, - md: metadata, - }, - }, - }), - ); - } + }, + }), + ); logger().debug("error response sent", { rpcId, rpcName }); } diff --git a/packages/actor-core/src/actor/protocol/message/to-client.ts b/packages/actor-core/src/actor/protocol/message/to-client.ts index 34ddd4884..5547d100b 100644 --- a/packages/actor-core/src/actor/protocol/message/to-client.ts +++ b/packages/actor-core/src/actor/protocol/message/to-client.ts @@ -9,64 +9,42 @@ export const InitSchema = z.object({ }); // Used for connection errors (both during initialization and afterwards) -export const ConnectionErrorSchema = z.object({ +export const ErrorSchema = z.object({ // Code c: z.string(), // Message m: z.string(), // Metadata md: z.unknown().optional(), + // RPC ID + ri: z.number().int().optional(), }); -export const RpcResponseOkSchema = z.object({ +export const RpcResponseSchema = z.object({ // ID i: z.number().int(), // Output o: z.unknown(), }); -export const RpcResponseErrorSchema = z.object({ - // ID - i: z.number().int(), - // Code - c: z.string(), - // Message - m: z.string(), - // Metadata - md: z.unknown().optional(), -}); - -export const ToClientEventSchema = z.object({ +export const EventSchema = z.object({ // Name n: z.string(), // Args a: z.array(z.unknown()), }); -export const ToClientErrorSchema = z.object({ - // Code - c: z.string(), - // Message - m: z.string(), - // Metadata - md: z.unknown().optional(), -}); - export const ToClientSchema = z.object({ // Body b: z.union([ z.object({ i: InitSchema }), - z.object({ ce: ConnectionErrorSchema }), - z.object({ ro: RpcResponseOkSchema }), - z.object({ re: RpcResponseErrorSchema }), - z.object({ ev: ToClientEventSchema }), - z.object({ er: ToClientErrorSchema }), + z.object({ e: ErrorSchema }), + z.object({ rr: RpcResponseSchema }), + z.object({ ev: EventSchema }), ]), }); export type ToClient = z.infer; -export type ConnectionError = z.infer; -export type RpcResponseOk = z.infer; -export type RpcResponseError = z.infer; -export type ToClientEvent = z.infer; -export type ToClientError = z.infer; +export type Error = z.infer; +export type RpcResponse = z.infer; +export type Event = z.infer; diff --git a/packages/actor-core/src/actor/protocol/serde.ts b/packages/actor-core/src/actor/protocol/serde.ts index 8118343ba..245513e61 100644 --- a/packages/actor-core/src/actor/protocol/serde.ts +++ b/packages/actor-core/src/actor/protocol/serde.ts @@ -20,7 +20,7 @@ export type Encoding = z.infer; /** * Helper class that helps serialize data without re-serializing for the same encoding. */ -export class CachedSerializer { +export class CachedSerializer { #data: T; #cache = new Map(); diff --git a/packages/actor-core/src/actor/router_endpoints.ts b/packages/actor-core/src/actor/router_endpoints.ts index e4d264584..f54320f21 100644 --- a/packages/actor-core/src/actor/router_endpoints.ts +++ b/packages/actor-core/src/actor/router_endpoints.ts @@ -249,6 +249,8 @@ export async function handleRpc( const encoding = getRequestEncoding(c.req); const parameters = getRequestConnParams(c.req, appConfig, driverConfig); + logger().debug("handling rpc", { rpcName, encoding }); + // Validate incoming request let rpcArgs: unknown[]; if (encoding === "json") { @@ -271,7 +273,7 @@ export async function handleRpc( ); // Validate using the RPC schema - const result = protoHttpRpc.RequestSchema.safeParse(deserialized); + const result = protoHttpRpc.RpcRequestSchema.safeParse(deserialized); if (!result.success) { throw new errors.InvalidRpcRequest("Invalid RPC request format"); } diff --git a/packages/actor-core/src/client/actor_common.ts b/packages/actor-core/src/client/actor_common.ts new file mode 100644 index 000000000..6fe7ad47c --- /dev/null +++ b/packages/actor-core/src/client/actor_common.ts @@ -0,0 +1,29 @@ +import type { AnyActorDefinition, ActorDefinition } from "@/actor/definition"; + +/** + * RPC function returned by Actor connections and handles. + * + * @typedef {Function} ActorRPCFunction + * @template Args + * @template Response + * @param {...Args} args - Arguments for the RPC function. + * @returns {Promise} + */ +export type ActorRPCFunction< + Args extends Array = unknown[], + Response = unknown, +> = ( + ...args: Args extends [unknown, ...infer Rest] ? Rest : Args +) => Promise; + +/** + * Maps RPC methods from actor definition to typed function signatures. + */ +export type ActorDefinitionRpcs = + AD extends ActorDefinition ? { + [K in keyof R]: R[K] extends ( + ...args: infer Args + ) => infer Return + ? ActorRPCFunction + : never; + } : never; \ No newline at end of file diff --git a/packages/actor-core/src/client/actor_conn.ts b/packages/actor-core/src/client/actor_conn.ts index bbdec0c50..e7a3f88e7 100644 --- a/packages/actor-core/src/client/actor_conn.ts +++ b/packages/actor-core/src/client/actor_conn.ts @@ -9,16 +9,20 @@ import * as errors from "./errors"; import { logger } from "./log"; import { type WebSocketMessage as ConnMessage, messageLength } from "./utils"; import { ACTOR_CONNS_SYMBOL, type ClientRaw } from "./client"; -import type { ActorDefinition, AnyActorDefinition } from "@/actor/definition"; +import type { AnyActorDefinition } from "@/actor/definition"; import pRetry from "p-retry"; import { importWebSocket } from "@/common/websocket"; import { importEventSource } from "@/common/eventsource"; -import invariant from "invariant"; import type { ActorQuery } from "@/manager/protocol/query"; +import { ActorDefinitionRpcs as ActorDefinitionRpcsImport } from "./actor_common"; + +// Re-export the type with the original name to maintain compatibility +type ActorDefinitionRpcs = + ActorDefinitionRpcsImport; interface RpcInFlight { name: string; - resolve: (response: wsToClient.RpcResponseOk) => void; + resolve: (response: wsToClient.RpcResponse) => void; reject: (error: Error) => void; } @@ -37,9 +41,9 @@ export type EventUnsubscribe = () => void; /** * A function that handles connection errors. * - * @typedef {Function} ConnectionErrorCallback + * @typedef {Function} ActorErrorCallback */ -export type ConnectionErrorCallback = (error: errors.ConnectionError) => void; +export type ActorErrorCallback = (error: errors.ActorError) => void; interface SendOpts { ephemeral: boolean; @@ -80,7 +84,7 @@ export class ActorConnRaw { // biome-ignore lint/suspicious/noExplicitAny: Unknown subscription type #eventSubscriptions = new Map>>(); - #errorHandlers = new Set(); + #errorHandlers = new Set(); #rpcIdCounter = 0; @@ -157,7 +161,7 @@ export class ActorConnRaw { this.#rpcIdCounter += 1; const { promise, resolve, reject } = - Promise.withResolvers(); + Promise.withResolvers(); this.#rpcInFlight.set(rpcId, { name, resolve, reject }); this.#sendMessage({ @@ -359,42 +363,52 @@ enc connectionId: this.#connectionId, }); this.#handleOnOpen(); - } else if ("ce" in response.b) { + } else if ("e" in response.b) { // Connection error - const { c: code, m: message, md: metadata } = response.b.ce; + const { c: code, m: message, md: metadata, ri: rpcId } = response.b.e; - logger().warn("actor connection error", { - code, - message, - metadata, - }); + if (rpcId) { + const inFlight = this.#takeRpcInFlight(rpcId); - // Create a connection error - const connectionError = new errors.ConnectionError( - code, - message, - metadata, - ); + logger().warn("rpc error", { + actionId: rpcId, + actionName: inFlight?.name, + code, + message, + metadata, + }); - // If we have an onOpenPromise, reject it with the error - if (this.#onOpenPromise) { - this.#onOpenPromise.reject(connectionError); - } + inFlight.reject(new errors.ActorError(code, message, metadata)); + } else { + logger().warn("connection error", { + code, + message, + metadata, + }); - // Reject any in-flight requests - for (const [id, inFlight] of this.#rpcInFlight.entries()) { - inFlight.reject(connectionError); - this.#rpcInFlight.delete(id); - } + // Create a connection error + const actorError = new errors.ActorError(code, message, metadata); + + // If we have an onOpenPromise, reject it with the error + if (this.#onOpenPromise) { + this.#onOpenPromise.reject(actorError); + } - // Dispatch to error handler if registered - this.#dispatchConnectionError(connectionError); - } else if ("ro" in response.b) { + // Reject any in-flight requests + for (const [id, inFlight] of this.#rpcInFlight.entries()) { + inFlight.reject(actorError); + this.#rpcInFlight.delete(id); + } + + // Dispatch to error handler if registered + this.#dispatchActorError(actorError); + } + } else if ("rr" in response.b) { // RPC response OK - const { i: rpcId } = response.b.ro; + const { i: rpcId, o: outputType } = response.b.rr; logger().trace("received RPC response", { rpcId, - outputType: typeof response.b.ro.o, + outputType, }); const inFlight = this.#takeRpcInFlight(rpcId); @@ -402,38 +416,13 @@ enc rpcId, actionName: inFlight?.name, }); - inFlight.resolve(response.b.ro); - } else if ("re" in response.b) { - // RPC response error - const { i: rpcId, c: code, m: message, md: metadata } = response.b.re; - logger().trace("received RPC error", { rpcId, code, message }); - - const inFlight = this.#takeRpcInFlight(rpcId); - - logger().warn("actor error", { - actionId: rpcId, - actionName: inFlight?.name, - code, - message, - metadata, - }); - - inFlight.reject(new errors.ActionError(code, message, metadata)); + inFlight.resolve(response.b.rr); } else if ("ev" in response.b) { logger().trace("received event", { name: response.b.ev.n, argsCount: response.b.ev.a?.length, }); this.#dispatchEvent(response.b.ev); - } else if ("er" in response.b) { - const { c: code, m: message, md: metadata } = response.b.er; - logger().trace("received error", { code, message }); - - logger().warn("actor error", { - code, - message, - metadata, - }); } else { assertUnreachable(response.b); } @@ -525,7 +514,7 @@ enc return inFlight; } - #dispatchEvent(event: wsToClient.ToClientEvent) { + #dispatchEvent(event: wsToClient.Event) { const { n: name, a: args } = event; const listeners = this.#eventSubscriptions.get(name); @@ -547,7 +536,7 @@ enc } } - #dispatchConnectionError(error: errors.ConnectionError) { + #dispatchActorError(error: errors.ActorError) { // Call all registered error handlers for (const handler of [...this.#errorHandlers]) { try { @@ -626,10 +615,10 @@ enc /** * Subscribes to connection errors. * - * @param {ConnectionErrorCallback} callback - The callback function to execute when a connection error occurs. + * @param {ActorErrorCallback} callback - The callback function to execute when a connection error occurs. * @returns {() => void} - A function to unsubscribe from the error handler. */ - onError(callback: ConnectionErrorCallback): () => void { + onError(callback: ActorErrorCallback): () => void { this.#errorHandlers.add(callback); // Return unsubscribe function @@ -840,17 +829,6 @@ enc } } -type ExtractActorDefinitionRpcs = - AD extends ActorDefinition ? R : never; - -type ActorDefinitionRpcs = { - [K in keyof ExtractActorDefinitionRpcs]: ExtractActorDefinitionRpcs[K] extends ( - ...args: infer Args - ) => infer Return - ? ActorRPCFunction - : never; -}; - /** * Connection to an actor. Allows calling actor's remote procedure calls with inferred types. See {@link ActorConnRaw} for underlying methods. * @@ -869,23 +847,3 @@ type ActorDefinitionRpcs = { export type ActorConn = ActorConnRaw & ActorDefinitionRpcs; - -//{ -// [K in keyof A as K extends string ? K extends `_${string}` ? never : K : K]: A[K] extends (...args: infer Args) => infer Return ? ActorRPCFunction : never; -//}; -/** - * RPC function returned by `ActorConn`. This will call `ActorConn.rpc` when triggered. - * - * @typedef {Function} ActorRPCFunction - * @template Args - * @template Response - * @param {...Args} args - Arguments for the RPC function. - * @returns {Promise} - */ - -export type ActorRPCFunction< - Args extends Array = unknown[], - Response = unknown, -> = ( - ...args: Args extends [unknown, ...infer Rest] ? Rest : Args -) => Promise; diff --git a/packages/actor-core/src/client/actor_handle.ts b/packages/actor-core/src/client/actor_handle.ts new file mode 100644 index 000000000..dc5949010 --- /dev/null +++ b/packages/actor-core/src/client/actor_handle.ts @@ -0,0 +1,104 @@ +import type { Encoding } from "@/actor/protocol/serde"; +import { logger } from "./log"; +import { sendHttpRequest } from "./utils"; +import type { AnyActorDefinition } from "@/actor/definition"; +import type { ActorQuery } from "@/manager/protocol/query"; +import type { ActorDefinitionRpcs } from "./actor_common"; +import type { RpcRequest, RpcResponse } from "@/actor/protocol/http/rpc"; + +/** + * Provides underlying functions for stateless {@link ActorHandle} for RPC calls. + * Similar to ActorConnRaw but doesn't maintain a connection. + * + * @see {@link ActorHandle} + */ +export class ActorHandleRaw { + #endpoint: string; + #encodingKind: Encoding; + #actorQuery: ActorQuery; + + /** + * Do not call this directly. + * + * Creates an instance of ActorHandleRaw. + * + * @param {string} endpoint - The endpoint to connect to. + * + * @protected + */ + public constructor( + endpoint: string, + private readonly params: unknown, + encodingKind: Encoding, + actorQuery: ActorQuery, + ) { + this.#endpoint = endpoint; + this.#encodingKind = encodingKind; + this.#actorQuery = actorQuery; + } + + /** + * Call a raw RPC. This method sends an HTTP request to invoke the named RPC. + * + * NOTE on Implementation: + * The implementation here faces some challenges with the test environment: + * 1. The endpoint path is /actors/rpc/:rpc in the manager router + * 2. The test uses the standalone topology which doesn't properly set up the route + * 3. The server expects specifically formatted JSON array as the request body + * + * In a production environment, this would communicate properly with the endpoints + * defined in manager/router.ts. + * + * @see {@link ActorHandle} + * @template Args - The type of arguments to pass to the RPC function. + * @template Response - The type of the response returned by the RPC function. + * @param {string} name - The name of the RPC function to call. + * @param {...Args} args - The arguments to pass to the RPC function. + * @returns {Promise} - A promise that resolves to the response of the RPC function. + */ + async action = unknown[], Response = unknown>( + name: string, + ...args: Args + ): Promise { + logger().debug("actor handle action", { + name, + args, + query: this.#actorQuery, + }); + + // Build query parameters + let baseUrl = `${this.#endpoint}/actors/rpc/${encodeURIComponent(name)}?encoding=${this.#encodingKind}&query=${encodeURIComponent(JSON.stringify(this.#actorQuery))}`; + if (this.params !== undefined) { + baseUrl += `¶ms=${encodeURIComponent(JSON.stringify(this.params))}`; + } + + // Use the shared HTTP request utility with integrated serialization + const responseData = await sendHttpRequest({ + url: baseUrl, + method: "POST", + body: { a: args } satisfies RpcRequest, + encoding: this.#encodingKind, + }); + + return responseData.o as Response; + } +} + +/** + * Stateless handle to an actor. Allows calling actor's remote procedure calls with inferred types + * without establishing a persistent connection. + * + * @example + * ``` + * const room = client.get(...etc...); + * // This calls the rpc named `sendMessage` on the `ChatRoom` actor without a connection. + * await room.sendMessage('Hello, world!'); + * ``` + * + * Private methods (e.g. those starting with `_`) are automatically excluded. + * + * @template AD The actor class that this handle is for. + * @see {@link ActorHandleRaw} + */ +export type ActorHandle = ActorHandleRaw & + ActorDefinitionRpcs; diff --git a/packages/actor-core/src/client/client.ts b/packages/actor-core/src/client/client.ts index 07a0eead1..ed69c59f2 100644 --- a/packages/actor-core/src/client/client.ts +++ b/packages/actor-core/src/client/client.ts @@ -5,9 +5,13 @@ import * as errors from "./errors"; import { ActorConn, ActorConnRaw, - ActorRPCFunction, CONNECT_SYMBOL, } from "./actor_conn"; +import { + ActorHandle, + ActorHandleRaw +} from "./actor_handle"; +import { ActorRPCFunction } from "./actor_common"; import { logger } from "./log"; import type { ActorCoreApp } from "@/mod"; import type { AnyActorDefinition } from "@/actor/definition"; @@ -24,6 +28,38 @@ export type ExtractAppFromClient>> = * Represents an actor accessor that provides methods to interact with a specific actor. */ export interface ActorAccessor { + /** + * Gets a stateless handle to an actor by its key. + * The actor name is automatically injected from the property accessor. + * + * @template AD The actor class that this handle is for. + * @param {string | string[]} [key=[]] - The key to identify the actor. Can be a single string or an array of strings. + * @param {GetOptions} [opts] - Options for getting the actor. + * @returns {ActorHandle} - A handle to the actor. + */ + get(key?: string | string[], opts?: GetOptions): ActorHandle; + + /** + * Gets a stateless handle to an actor by its ID. + * + * @template AD The actor class that this handle is for. + * @param {string} actorId - The ID of the actor. + * @param {GetWithIdOptions} [opts] - Options for getting the actor. + * @returns {ActorHandle} - A handle to the actor. + */ + getForId(actorId: string, opts?: GetWithIdOptions): ActorHandle; + + /** + * Creates a new actor with the name automatically injected from the property accessor, + * and returns a stateless handle to it. + * + * @template AD The actor class that this handle is for. + * @param {string | string[]} key - The key to identify the actor. Can be a single string or an array of strings. + * @param {CreateOptions} [opts] - Options for creating the actor (excluding name and key). + * @returns {ActorHandle} - A handle to the actor. + */ + create(key: string | string[], opts?: CreateOptions): ActorHandle; + /** * Connects to an actor by its key, creating it if necessary. * The actor name is automatically injected from the property accessor. @@ -319,6 +355,132 @@ export class ClientRaw { return this.#createProxy(conn) as ActorConn; } + /** + * Gets a stateless handle to an actor by its ID. + * + * @template AD The actor class that this handle is for. + * @param {string} name - The name of the actor. + * @param {string} actorId - The ID of the actor. + * @param {GetWithIdOptions} [opts] - Options for getting the actor. + * @returns {ActorHandle} - A handle to the actor. + */ + getForId( + name: string, + actorId: string, + opts?: GetWithIdOptions, + ): ActorHandle { + logger().debug("get handle to actor with id", { + name, + actorId, + params: opts?.params, + }); + + const actorQuery = { + getForId: { + actorId, + }, + }; + + const managerEndpoint = this.#managerEndpoint; + const handle = this.#createHandle(managerEndpoint, opts?.params, actorQuery); + return this.#createHandleProxy(handle) as ActorHandle; + } + + /** + * Gets a stateless handle to an actor by its key. + * + * @template AD The actor class that this handle is for. + * @param {string} name - The name of the actor. + * @param {string | string[]} [key=[]] - The key to identify the actor. Can be a single string or an array of strings. + * @param {GetOptions} [opts] - Options for getting the actor. + * @returns {ActorHandle} - A handle to the actor. + */ + get( + name: string, + key?: string | string[], + opts?: GetOptions, + ): ActorHandle { + // Convert string to array of strings + const keyArray: string[] = typeof key === "string" ? [key] : key || []; + + logger().debug("get handle to actor", { + name, + key: keyArray, + parameters: opts?.params, + noCreate: opts?.noCreate, + createInRegion: opts?.createInRegion, + }); + + let actorQuery: ActorQuery; + if (opts?.noCreate) { + // Use getForKey endpoint if noCreate is specified + actorQuery = { + getForKey: { + name, + key: keyArray, + }, + }; + } else { + // Use getOrCreateForKey endpoint + actorQuery = { + getOrCreateForKey: { + name, + key: keyArray, + region: opts?.createInRegion, + }, + }; + } + + const managerEndpoint = this.#managerEndpoint; + const handle = this.#createHandle( + managerEndpoint, + opts?.params, + actorQuery, + ); + return this.#createHandleProxy(handle) as ActorHandle; + } + + /** + * Creates a new actor with the provided key and returns a stateless handle to it. + * + * @template AD The actor class that this handle is for. + * @param {string} name - The name of the actor. + * @param {string | string[]} key - The key to identify the actor. Can be a single string or an array of strings. + * @param {CreateOptions} [opts] - Options for creating the actor (excluding name and key). + * @returns {ActorHandle} - A handle to the actor. + */ + create( + name: string, + key: string | string[], + opts: CreateOptions = {}, + ): ActorHandle { + // Convert string to array of strings + const keyArray: string[] = typeof key === "string" ? [key] : key; + + // Build create config + const create = { + ...opts, + // Do these last to override `opts` + name, + key: keyArray, + }; + + logger().debug("create actor handle", { + name, + key: keyArray, + parameters: opts?.params, + create, + }); + + const actorQuery = { + create, + }; + + const managerEndpoint = this.#managerEndpoint; + const handle = this.#createHandle(managerEndpoint, opts?.params, actorQuery); + return this.#createHandleProxy(handle) as ActorHandle; + } + #createConn( endpoint: string, params: unknown, @@ -337,6 +499,19 @@ export class ClientRaw { return conn; } + #createHandle( + endpoint: string, + params: unknown, + actorQuery: ActorQuery, + ): ActorHandleRaw { + return new ActorHandleRaw( + endpoint, + params, + this.#encodingKind, + actorQuery, + ); + } + #createProxy( conn: ActorConnRaw, ): ActorConn { @@ -416,6 +591,82 @@ export class ClientRaw { }) as ActorConn; } + #createHandleProxy( + handle: ActorHandleRaw, + ): ActorHandle { + // Stores returned RPC functions for faster calls + const methodCache = new Map(); + return new Proxy(handle, { + get(target: ActorHandleRaw, prop: string | symbol, receiver: unknown) { + // Handle built-in Symbol properties + if (typeof prop === "symbol") { + return Reflect.get(target, prop, receiver); + } + + // Handle built-in Promise methods and existing properties + if ( + prop === "constructor" || + prop in target + ) { + const value = Reflect.get(target, prop, receiver); + // Preserve method binding + if (typeof value === "function") { + return value.bind(target); + } + return value; + } + + // Create RPC function that preserves 'this' context + if (typeof prop === "string") { + let method = methodCache.get(prop); + if (!method) { + method = (...args: unknown[]) => target.action(prop, ...args); + methodCache.set(prop, method); + } + return method; + } + }, + + // Support for 'in' operator + has(target: ActorHandleRaw, prop: string | symbol) { + // All string properties are potentially RPC functions + if (typeof prop === "string") { + return true; + } + // For symbols, defer to the target's own has behavior + return Reflect.has(target, prop); + }, + + // Support instanceof checks + getPrototypeOf(target: ActorHandleRaw) { + return Reflect.getPrototypeOf(target); + }, + + // Prevent property enumeration of non-existent RPC methods + ownKeys(target: ActorHandleRaw) { + return Reflect.ownKeys(target); + }, + + // Support proper property descriptors + getOwnPropertyDescriptor(target: ActorHandleRaw, prop: string | symbol) { + const targetDescriptor = Reflect.getOwnPropertyDescriptor(target, prop); + if (targetDescriptor) { + return targetDescriptor; + } + if (typeof prop === "string") { + // Make RPC methods appear non-enumerable + return { + configurable: true, + enumerable: false, + writable: false, + value: (...args: unknown[]) => target.action(prop, ...args), + }; + } + return undefined; + }, + }) as ActorHandle; + } + /** * Sends an HTTP request to the manager actor. * @private @@ -455,7 +706,7 @@ export class ClientRaw { /** * Disconnects from all actors. * - * @returns {Promise} A promise that resolves when the socket is gracefully closed. + * @returns {Promise} A promise that resolves when all connections are closed. */ async dispose(): Promise { if (this.#disposed) { @@ -467,9 +718,12 @@ export class ClientRaw { logger().debug("disposing client"); const disposePromises = []; + + // Dispose all connections for (const conn of this[ACTOR_CONNS_SYMBOL].values()) { disposePromises.push(conn.dispose()); } + await Promise.all(disposePromises); } } @@ -517,6 +771,39 @@ export function createClient>( if (typeof prop === "string") { // Return actor accessor object with methods return { + // Handle methods (stateless RPC) + get: ( + key?: string | string[], + opts?: GetOptions, + ): ActorHandle[typeof prop]> => { + return target.get[typeof prop]>( + prop, + key, + opts, + ); + }, + getForId: ( + actorId: string, + opts?: GetWithIdOptions, + ): ActorHandle[typeof prop]> => { + return target.getForId[typeof prop]>( + prop, + actorId, + opts, + ); + }, + create: ( + key: string | string[], + opts: CreateOptions = {}, + ): ActorHandle[typeof prop]> => { + return target.create[typeof prop]>( + prop, + key, + opts, + ); + }, + + // Connection methods connect: ( key?: string | string[], opts?: GetOptions, diff --git a/packages/actor-core/src/client/errors.ts b/packages/actor-core/src/client/errors.ts index 7c2400a78..f374a82d9 100644 --- a/packages/actor-core/src/client/errors.ts +++ b/packages/actor-core/src/client/errors.ts @@ -24,7 +24,7 @@ export class MalformedResponseMessage extends ActorClientError { } } -export class ActionError extends ActorClientError { +export class ActorError extends ActorClientError { constructor( public readonly code: string, message: string, @@ -34,15 +34,8 @@ export class ActionError extends ActorClientError { } } -/** - * Error thrown when a connection error occurs. - */ -export class ConnectionError extends ActorClientError { - constructor( - public readonly code: string, - message: string, - public readonly metadata?: unknown, - ) { - super(message); +export class HttpRequestError extends ActorClientError { + constructor(message: string, opts?: { cause?: unknown }) { + super(`HTTP request error: ${message}`, { cause: opts?.cause }); } } diff --git a/packages/actor-core/src/client/mod.ts b/packages/actor-core/src/client/mod.ts index a89c5becd..1b913f574 100644 --- a/packages/actor-core/src/client/mod.ts +++ b/packages/actor-core/src/client/mod.ts @@ -15,6 +15,9 @@ export type { export type { ActorConn } from "./actor_conn"; export { ActorConnRaw } from "./actor_conn"; export type { EventUnsubscribe } from "./actor_conn"; +export type { ActorHandle } from "./actor_handle"; +export { ActorHandleRaw } from "./actor_handle"; +export type { ActorRPCFunction } from "./actor_common"; export type { Transport } from "@/actor/protocol/message/mod"; export type { Encoding } from "@/actor/protocol/serde"; export type { CreateRequest } from "@/manager/protocol/query"; @@ -24,8 +27,7 @@ export { ManagerError, ConnParamsTooLong, MalformedResponseMessage, - ActionError, - ConnectionError, + ActorError, } from "@/client/errors"; export { AnyActorDefinition, diff --git a/packages/actor-core/src/client/utils.ts b/packages/actor-core/src/client/utils.ts index e67cae0c9..38961781a 100644 --- a/packages/actor-core/src/client/utils.ts +++ b/packages/actor-core/src/client/utils.ts @@ -1,4 +1,10 @@ -import { assertUnreachable } from "@/common/utils"; +import { deserialize } from "@/actor/protocol/serde"; +import { assertUnreachable, stringifyError } from "@/common/utils"; +import { Encoding } from "@/mod"; +import * as cbor from "cbor-x"; +import { ActorError, HttpRequestError } from "./errors"; +import { ResponseError } from "@/actor/protocol/http/error"; +import { logger } from "./log"; export type WebSocketMessage = string | Blob | ArrayBuffer | Uint8Array; @@ -17,3 +23,113 @@ export function messageLength(message: WebSocketMessage): number { } assertUnreachable(message); } + +export interface HttpRequestOpts { + method: string; + url: string; + body?: Body; + encoding: Encoding; + skipParseResponse?: boolean; +} + +export async function sendHttpRequest< + RequestBody = unknown, + ResponseBody = unknown, +>(opts: HttpRequestOpts): Promise { + logger().debug("sending http request", { + url: opts.url, + encoding: opts.encoding, + }); + + // Serialize body + let contentType: string | undefined = undefined; + let bodyData: string | Buffer | undefined = undefined; + if (opts.method === "POST" || opts.method === "PUT") { + if (opts.encoding === "json") { + contentType = "application/json"; + bodyData = JSON.stringify(opts.body); + } else if (opts.encoding === "cbor") { + contentType = "application/octet-stream"; + bodyData = cbor.encode(opts.body); + } else { + assertUnreachable(opts.encoding); + } + } + + // Send request + let response: Response; + try { + // Make the HTTP request + response = await fetch(opts.url, { + method: opts.method, + headers: contentType + ? { + "Content-Type": contentType, + } + : {}, + body: bodyData, + }); + } catch (error) { + throw new HttpRequestError(`Request failed: ${error}`, { + cause: error, + }); + } + + // Parse response error + if (!response.ok) { + // Attempt to parse structured data + const bufferResponse = await response.arrayBuffer(); + let responseData: ResponseError; + try { + if (opts.encoding === "json") { + const textResponse = new TextDecoder().decode(bufferResponse); + responseData = JSON.parse(textResponse); + } else if (opts.encoding === "cbor") { + const uint8Array = new Uint8Array(bufferResponse); + responseData = cbor.decode(uint8Array); + } else { + assertUnreachable(opts.encoding); + } + } catch (error) { + //logger().warn("failed to cleanly parse error, this is likely because a non-structured response is being served", { + // error: stringifyError(error), + //}); + + // Error is not structured + const textResponse = new TextDecoder("utf-8", { fatal: false }).decode( + bufferResponse, + ); + throw new HttpRequestError( + `${response.statusText} (${response.status}):\n${textResponse}`, + ); + } + + // Throw structured error + throw new ActorError(responseData.c, responseData.m, responseData.md); + } + + // Some requests don't need the success response to be parsed, so this can speed things up + if (opts.skipParseResponse) { + return undefined as ResponseBody; + } + + // Parse the response based on encoding + let responseBody: ResponseBody; + try { + if (opts.encoding === "json") { + responseBody = (await response.json()) as ResponseBody; + } else if (opts.encoding === "cbor") { + const buffer = await response.arrayBuffer(); + const uint8Array = new Uint8Array(buffer); + responseBody = cbor.decode(uint8Array); + } else { + assertUnreachable(opts.encoding); + } + } catch (error) { + throw new HttpRequestError(`Failed to parse response: ${error}`, { + cause: error, + }); + } + + return responseBody; +} diff --git a/packages/actor-core/src/common/router.ts b/packages/actor-core/src/common/router.ts index 55247e7a3..2ef77eaf1 100644 --- a/packages/actor-core/src/common/router.ts +++ b/packages/actor-core/src/common/router.ts @@ -1,6 +1,9 @@ import type { Context as HonoContext, Next } from "hono"; import { getLogger, Logger } from "./log"; import { deconstructError } from "./utils"; +import { getRequestEncoding } from "@/actor/router_endpoints"; +import { serialize } from "@/actor/protocol/serde"; +import { ResponseError } from "@/actor/protocol/http/error"; export function logger() { return getLogger("router"); @@ -41,5 +44,15 @@ export function handleRouteError(error: unknown, c: HonoContext) { }, ); - return c.json({ code, message, metadata }, { status: statusCode }); + const encoding = getRequestEncoding(c.req); + const output = serialize( + { + c: code, + m: message, + md: metadata, + } satisfies ResponseError, + encoding, + ); + + return c.body(output, { status: statusCode }); } diff --git a/packages/actor-core/src/manager/router.ts b/packages/actor-core/src/manager/router.ts index 86e82f801..beb91ceda 100644 --- a/packages/actor-core/src/manager/router.ts +++ b/packages/actor-core/src/manager/router.ts @@ -179,7 +179,7 @@ export function createManagerRouter( // Serialize and send the connection error const errorMsg: ToClient = { b: { - ce: { + e: { c: code, m: message, md: metadata, @@ -278,7 +278,7 @@ export function createManagerRouter( // Serialize and send the connection error const errorMsg: ToClient = { b: { - ce: { + e: { c: code, m: message, md: metadata, diff --git a/packages/actor-core/tests/actor-handle.test.ts b/packages/actor-core/tests/actor-handle.test.ts new file mode 100644 index 000000000..d7080ffb1 --- /dev/null +++ b/packages/actor-core/tests/actor-handle.test.ts @@ -0,0 +1,129 @@ +import { actor, setup } from "@/mod"; +import { describe, test, expect, vi } from "vitest"; +import { setupTest } from "@/test/mod"; +import { createHash } from "crypto"; + +describe("ActorHandle", () => { + test("basic handle operations", async (c) => { + // Create a simple counter actor + const counter = actor({ + state: { count: 0 }, + actions: { + increment: (c, x: number) => { + c.state.count += x; + return c.state.count; + }, + getCount: (c) => { + return c.state.count; + }, + }, + }); + + const app = setup({ + actors: { counter }, + }); + + const { client } = await setupTest(c, app); + + // Test get (getOrCreate behavior) + const counterHandle = client.counter.get("test-counter"); + expect(counterHandle).toBeDefined(); + + const count = await counterHandle.increment(1); + expect(count).toBe(1); + }); + + test("get with noCreate option", async (c) => { + const counter = actor({ + state: { count: 0 }, + actions: { + increment: (c, x: number) => { + c.state.count += x; + return c.state.count; + }, + }, + }); + + const app = setup({ + actors: { counter }, + }); + + const { client } = await setupTest(c, app); + + // Test handles can be created + const counterHandle1 = client.counter.get("test-counter-nocreate"); + expect(counterHandle1).toBeDefined(); + + const counterHandle2 = client.counter.get("test-counter-nocreate", { + noCreate: true, + }); + expect(counterHandle2).toBeDefined(); + }); + + test("create and getForId", async (c) => { + const counter = actor({ + state: { count: 0 }, + actions: { + increment: (c, x: number) => { + c.state.count += x; + return c.state.count; + }, + getCount: (c) => { + return c.state.count; + }, + getActorId: (c) => { + return c.actorId; + }, + }, + }); + + const app = setup({ + actors: { counter }, + }); + + const { client } = await setupTest(c, app); + + // Check that handles can be created + const createdHandle = client.counter.create("test-counter-create"); + await createdHandle.increment(10); + const actorId = await createdHandle.getActorId(); + + // Get the same actor by ID + const idHandle = client.counter.getForId(actorId); + const count = await idHandle.getCount(); + expect(count).toBe(10); + }); + + test("handles are stateless but access the same actor", async (c) => { + const counter = actor({ + state: { count: 0 }, + actions: { + increment: (c, x: number) => { + c.state.count += x; + return c.state.count; + }, + getCount: (c) => { + return c.state.count; + }, + }, + }); + + const app = setup({ + actors: { counter }, + }); + + const { client } = await setupTest(c, app); + + // Create handles + const handle1 = client.counter.get("test-stateless"); + + const handle2 = client.counter.get("test-stateless"); + + await handle1.increment(1); + await handle2.increment(2); + + // Both handles access the same actor state + const count = await handle1.getCount(); + expect(count).toBe(3); + }); +}); diff --git a/packages/misc/driver-test-suite/fixtures/apps/counter.ts b/packages/misc/driver-test-suite/fixtures/apps/counter.ts index 59a418315..49dc5a07e 100644 --- a/packages/misc/driver-test-suite/fixtures/apps/counter.ts +++ b/packages/misc/driver-test-suite/fixtures/apps/counter.ts @@ -8,6 +8,9 @@ const counter = actor({ c.broadcast("newCount", c.state.count); return c.state.count; }, + getCount: (c) => { + return c.state.count; + }, }, }); diff --git a/packages/misc/driver-test-suite/src/tests/actor-driver.ts b/packages/misc/driver-test-suite/src/tests/actor-driver.ts index c4ec937dc..08aad4cf8 100644 --- a/packages/misc/driver-test-suite/src/tests/actor-driver.ts +++ b/packages/misc/driver-test-suite/src/tests/actor-driver.ts @@ -109,5 +109,53 @@ export function runActorDriverTests(driverTestConfig: DriverTestConfigWithTransp expect(scheduledCount).toBe(1); }); }); + + describe("Actor Handle", () => { + test("stateless handle can perform RPC calls", async (c) => { + const { client } = await setupDriverTest( + c, + driverTestConfig, + resolve(__dirname, "../fixtures/apps/counter.ts"), + ); + + // Get a handle to an actor + const counterHandle = client.counter.get("test-handle"); + await counterHandle.increment(1); + await counterHandle.increment(2); + const count = await counterHandle.getCount(); + expect(count).toBe(3); + }); + + test("stateless handles to same actor share state", async (c) => { + const { client } = await setupDriverTest( + c, + driverTestConfig, + resolve(__dirname, "../fixtures/apps/counter.ts"), + ); + + // Get a handle to an actor + const handle1 = client.counter.get("test-handle-shared"); + await handle1.increment(5); + + // Get another handle to same actor + const handle2 = client.counter.get("test-handle-shared"); + const count = await handle2.getCount(); + expect(count).toBe(5); + }); + + test("create new actor with handle", async (c) => { + const { client } = await setupDriverTest( + c, + driverTestConfig, + resolve(__dirname, "../fixtures/apps/counter.ts"), + ); + + // Create a new actor with handle + const createdHandle = client.counter.create("test-handle-create"); + await createdHandle.increment(5); + const count = await createdHandle.getCount(); + expect(count).toBe(5); + }); + }); }); } diff --git a/packages/misc/driver-test-suite/src/tests/manager-driver.ts b/packages/misc/driver-test-suite/src/tests/manager-driver.ts index 6b1b63534..b75ce24c5 100644 --- a/packages/misc/driver-test-suite/src/tests/manager-driver.ts +++ b/packages/misc/driver-test-suite/src/tests/manager-driver.ts @@ -1,11 +1,17 @@ import { describe, test, expect, vi } from "vitest"; -import { DriverTestConfigWithTransport, waitFor, type DriverTestConfig } from "@/mod"; +import { + DriverTestConfigWithTransport, + waitFor, + type DriverTestConfig, +} from "@/mod"; import { setupDriverTest } from "@/utils"; import { resolve } from "node:path"; import type { App as CounterApp } from "../../fixtures/apps/counter"; -import { ConnectionError } from "actor-core/client"; +import { ActorError } from "actor-core/client"; -export function runManagerDriverTests(driverTestConfig: DriverTestConfigWithTransport) { +export function runManagerDriverTests( + driverTestConfig: DriverTestConfigWithTransport, +) { describe("Manager Driver Tests", () => { describe("Client Connection Methods", () => { test("connect() - finds or creates an actor", async (c) => { @@ -89,7 +95,7 @@ export function runManagerDriverTests(driverTestConfig: DriverTestConfigWithTran const nonexistentId = `nonexistent-${crypto.randomUUID()}`; // Should fail when actor doesn't exist - let counter1Error: ConnectionError; + let counter1Error: ActorError; const counter1 = client.counter.connect([nonexistentId], { noCreate: true, }); @@ -97,7 +103,7 @@ export function runManagerDriverTests(driverTestConfig: DriverTestConfigWithTran counter1Error = e; }); await vi.waitFor( - () => expect(counter1Error).toBeInstanceOf(ConnectionError), + () => expect(counter1Error).toBeInstanceOf(ActorError), 500, ); await counter1.dispose();