Skip to content

feat: add stateless get/create/getWithId #956

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions packages/actor-core/src/actor/action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ export class ActionContext<S, CP, CS, V> {
return this.#actorContext.log;
}

/**
* Gets actor ID.
*/
get actorId(): string {
return this.#actorContext.actorId;
}

/**
* Gets the actor name.
*/
Expand Down
3 changes: 2 additions & 1 deletion packages/actor-core/src/actor/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { CachedSerializer } from "./protocol/serde";
import type { ConnDriver } from "./driver";
import * as messageToClient from "@/actor/protocol/message/to-client";
import type { PersistedConn } from "./persisted";
import * as wsToClient from "@/actor/protocol/message/to-client";

export function generateConnId(): string {
return crypto.randomUUID();
Expand Down Expand Up @@ -135,7 +136,7 @@ export class Conn<S, CP, CS, V> {
*/
public send(eventName: string, ...args: unknown[]) {
this._sendMessage(
new CachedSerializer({
new CachedSerializer<wsToClient.ToClient>({
b: {
ev: {
n: eventName,
Expand Down
17 changes: 7 additions & 10 deletions packages/actor-core/src/actor/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { Conn, ConnId } from "./connection";
import { ActorKey } from "@/common/utils";
import { Schedule } from "./schedule";


/**
* ActorContext class that provides access to actor methods and state
*/
Expand Down Expand Up @@ -36,7 +35,6 @@ export class ActorContext<S, CP, CS, V> {
* @param args - The arguments to send with the event.
*/
broadcast<Args extends Array<unknown>>(name: string, ...args: Args): void {
// @ts-ignore - Access protected method
this.#actor._broadcast(name, ...args);
return;
}
Expand All @@ -45,47 +43,48 @@ export class ActorContext<S, CP, CS, V> {
* Gets the logger instance.
*/
get log(): Logger {
// @ts-ignore - Access protected method
return this.#actor.log;
}

/**
* Gets actor ID.
*/
get actorId(): string {
return this.#actor.id;
}

/**
* Gets the actor name.
*/
get name(): string {
// @ts-ignore - Access protected method
return this.#actor.name;
}

/**
* Gets the actor key.
*/
get key(): ActorKey {
// @ts-ignore - Access protected method
return this.#actor.key;
}

/**
* Gets the region.
*/
get region(): string {
// @ts-ignore - Access protected method
return this.#actor.region;
}

/**
* Gets the scheduler.
*/
get schedule(): Schedule {
// @ts-ignore - Access protected method
return this.#actor.schedule;
}

/**
* Gets the map of connections.
*/
get conns(): Map<ConnId, Conn<S, CP, CS, V>> {
// @ts-ignore - Access protected method
return this.#actor.conns;
}

Expand All @@ -95,7 +94,6 @@ export class ActorContext<S, CP, CS, V> {
* @param opts - Options for saving the state.
*/
async saveState(opts: SaveStateOptions): Promise<void> {
// @ts-ignore - Access protected method
return this.#actor.saveState(opts);
}

Expand All @@ -105,7 +103,6 @@ export class ActorContext<S, CP, CS, V> {
* @param promise - The promise to run in the background.
*/
runInBackground(promise: Promise<void>): void {
// @ts-ignore - Access protected method
this.#actor._runInBackground(promise);
return;
}
Expand Down
5 changes: 3 additions & 2 deletions packages/actor-core/src/actor/instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { instanceLogger, logger } from "./log";
import type { ActionContext } from "./action";
import { DeadlineError, Lock, deadline } from "./utils";
import { Schedule } from "./schedule";
import * as wsToClient from "@/actor/protocol/message/to-client";
import type * as wsToServer from "@/actor/protocol/message/to-server";
import { CachedSerializer } from "./protocol/serde";
import { ActorInspector } from "@/inspector/actor";
Expand Down Expand Up @@ -716,7 +717,7 @@ export class ActorInstance<S, CP, CS, V> {

// Send init message
conn._sendMessage(
new CachedSerializer({
new CachedSerializer<wsToClient.ToClient>({
b: {
i: {
ci: `${conn.id}`,
Expand Down Expand Up @@ -1019,7 +1020,7 @@ export class ActorInstance<S, CP, CS, V> {
const subscriptions = this.#subscriptionIndex.get(name);
if (!subscriptions) return;

const toClientSerializer = new CachedSerializer({
const toClientSerializer = new CachedSerializer<wsToClient.ToClient>({
b: {
ev: {
n: name,
Expand Down
12 changes: 12 additions & 0 deletions packages/actor-core/src/actor/protocol/http/error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { z } from "zod";

export const ResponseErrorSchema = z.object({
// Code
c: z.string(),
// Message
m: z.string(),
// Metadata
md: z.unknown().optional(),
});

export type ResponseError = z.infer<typeof ResponseErrorSchema>;
17 changes: 4 additions & 13 deletions packages/actor-core/src/actor/protocol/http/rpc.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,15 @@
import { z } from "zod";

export const RequestSchema = z.object({
export const RpcRequestSchema = z.object({
// Args
a: z.array(z.unknown()),
});

export const ResponseOkSchema = z.object({
export const RpcResponseSchema = z.object({
// Output
o: z.unknown(),
});

export const ResponseErrSchema = z.object({
// Code
c: z.string(),
// Message
m: z.string(),
// Metadata
md: z.unknown().optional(),
});

export type Request = z.infer<typeof RequestSchema>;
export type ResponseOk = z.infer<typeof ResponseOkSchema>;
export type ResponseErr = z.infer<typeof ResponseErrSchema>;
export type RpcRequest = z.infer<typeof RpcRequestSchema>;
export type RpcResponse = z.infer<typeof RpcResponseSchema>;
38 changes: 12 additions & 26 deletions packages/actor-core/src/actor/protocol/message/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ export async function processMessage<S, CP, CS, V>(
conn._sendMessage(
new CachedSerializer<wsToClient.ToClient>({
b: {
ro: {
rr: {
i: id,
o: output,
},
Expand Down Expand Up @@ -179,32 +179,18 @@ export async function processMessage<S, CP, CS, V>(
});

// Build response
if (rpcId !== undefined) {
conn._sendMessage(
new CachedSerializer({
b: {
re: {
i: rpcId,
c: code,
m: message,
md: metadata,
},
conn._sendMessage(
new CachedSerializer<wsToClient.ToClient>({
b: {
e: {
c: code,
m: message,
md: metadata,
ri: rpcId,
},
}),
);
} else {
conn._sendMessage(
new CachedSerializer({
b: {
er: {
c: code,
m: message,
md: metadata,
},
},
}),
);
}
},
}),
);

logger().debug("error response sent", { rpcId, rpcName });
}
Expand Down
44 changes: 11 additions & 33 deletions packages/actor-core/src/actor/protocol/message/to-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,64 +9,42 @@ export const InitSchema = z.object({
});

// Used for connection errors (both during initialization and afterwards)
export const ConnectionErrorSchema = z.object({
export const ErrorSchema = z.object({
// Code
c: z.string(),
// Message
m: z.string(),
// Metadata
md: z.unknown().optional(),
// RPC ID
ri: z.number().int().optional(),
});

export const RpcResponseOkSchema = z.object({
export const RpcResponseSchema = z.object({
// ID
i: z.number().int(),
// Output
o: z.unknown(),
});

export const RpcResponseErrorSchema = z.object({
// ID
i: z.number().int(),
// Code
c: z.string(),
// Message
m: z.string(),
// Metadata
md: z.unknown().optional(),
});

export const ToClientEventSchema = z.object({
export const EventSchema = z.object({
// Name
n: z.string(),
// Args
a: z.array(z.unknown()),
});

export const ToClientErrorSchema = z.object({
// Code
c: z.string(),
// Message
m: z.string(),
// Metadata
md: z.unknown().optional(),
});

export const ToClientSchema = z.object({
// Body
b: z.union([
z.object({ i: InitSchema }),
z.object({ ce: ConnectionErrorSchema }),
z.object({ ro: RpcResponseOkSchema }),
z.object({ re: RpcResponseErrorSchema }),
z.object({ ev: ToClientEventSchema }),
z.object({ er: ToClientErrorSchema }),
z.object({ e: ErrorSchema }),
z.object({ rr: RpcResponseSchema }),
z.object({ ev: EventSchema }),
]),
});

export type ToClient = z.infer<typeof ToClientSchema>;
export type ConnectionError = z.infer<typeof ConnectionErrorSchema>;
export type RpcResponseOk = z.infer<typeof RpcResponseOkSchema>;
export type RpcResponseError = z.infer<typeof RpcResponseErrorSchema>;
export type ToClientEvent = z.infer<typeof ToClientEventSchema>;
export type ToClientError = z.infer<typeof ToClientErrorSchema>;
export type Error = z.infer<typeof ErrorSchema>;
export type RpcResponse = z.infer<typeof RpcResponseSchema>;
export type Event = z.infer<typeof EventSchema>;
2 changes: 1 addition & 1 deletion packages/actor-core/src/actor/protocol/serde.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export type Encoding = z.infer<typeof EncodingSchema>;
/**
* Helper class that helps serialize data without re-serializing for the same encoding.
*/
export class CachedSerializer<T = unknown> {
export class CachedSerializer<T> {
#data: T;
#cache = new Map<Encoding, OutputData>();

Expand Down
4 changes: 3 additions & 1 deletion packages/actor-core/src/actor/router_endpoints.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ export async function handleRpc(
const encoding = getRequestEncoding(c.req);
const parameters = getRequestConnParams(c.req, appConfig, driverConfig);

logger().debug("handling rpc", { rpcName, encoding });

// Validate incoming request
let rpcArgs: unknown[];
if (encoding === "json") {
Expand All @@ -271,7 +273,7 @@ export async function handleRpc(
);

// Validate using the RPC schema
const result = protoHttpRpc.RequestSchema.safeParse(deserialized);
const result = protoHttpRpc.RpcRequestSchema.safeParse(deserialized);
if (!result.success) {
throw new errors.InvalidRpcRequest("Invalid RPC request format");
}
Expand Down
29 changes: 29 additions & 0 deletions packages/actor-core/src/client/actor_common.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import type { AnyActorDefinition, ActorDefinition } from "@/actor/definition";

/**
* RPC function returned by Actor connections and handles.
*
* @typedef {Function} ActorRPCFunction
* @template Args
* @template Response
* @param {...Args} args - Arguments for the RPC function.
* @returns {Promise<Response>}
*/
export type ActorRPCFunction<
Args extends Array<unknown> = unknown[],
Response = unknown,
> = (
...args: Args extends [unknown, ...infer Rest] ? Rest : Args
) => Promise<Response>;

/**
* Maps RPC methods from actor definition to typed function signatures.
*/
export type ActorDefinitionRpcs<AD extends AnyActorDefinition> =
AD extends ActorDefinition<any, any, any, any, infer R> ? {
[K in keyof R]: R[K] extends (
...args: infer Args
) => infer Return
? ActorRPCFunction<Args, Return>
: never;
} : never;
Loading
Loading