From d572406830f73b6a7282df98dac43bf74d19a25b Mon Sep 17 00:00:00 2001 From: Kacper Wojciechowski <39823706+jog1t@users.noreply.github.com> Date: Wed, 26 Mar 2025 23:09:41 +0100 Subject: [PATCH 1/2] feat: add inspector to manager --- packages/actor-core-cli/package.json | 3 + packages/actor-core-cli/src/cli.ts | 3 +- .../actor-core-cli/src/commands/deploy.tsx | 37 +-- packages/actor-core-cli/src/commands/dev.tsx | 83 +++++++ packages/actor-core-cli/src/mod.ts | 1 + packages/actor-core-cli/src/ui/Workflow.tsx | 21 +- packages/actor-core-cli/src/workflow.tsx | 59 +++-- .../src/workflows/validate-config.ts | 37 +++ packages/actor-core-cli/tsup.config.ts | 1 + packages/actor-core-cli/turbo.json | 4 + packages/actor-core/package.json | 43 +++- packages/actor-core/src/actor/inspect.ts | 235 ------------------ packages/actor-core/src/actor/instance.ts | 6 +- packages/actor-core/src/actor/router.ts | 40 +-- packages/actor-core/src/app/config.ts | 14 +- .../actor-core/src/driver-helpers/config.ts | 7 +- packages/actor-core/src/inspector/actor.ts | 118 +++++++++ packages/actor-core/src/inspector/common.ts | 187 ++++++++++++++ packages/actor-core/src/inspector/config.ts | 16 ++ packages/actor-core/src/inspector/manager.ts | 114 +++++++++ packages/actor-core/src/inspector/mod.ts | 2 + .../protocol/actor}/mod.ts | 0 .../protocol/actor}/to-client.ts | 0 .../protocol/actor}/to-server.ts | 0 .../src/inspector/protocol/manager/mod.ts | 2 + .../inspector/protocol/manager/to-client.ts | 27 ++ .../inspector/protocol/manager/to-server.ts | 13 + packages/actor-core/src/manager/driver.ts | 10 +- packages/actor-core/src/manager/manager.ts | 131 ---------- packages/actor-core/src/manager/mod.ts | 2 +- packages/actor-core/src/manager/router.ts | 133 ++++++++++ .../src/topologies/coordinate/topology.ts | 28 ++- .../src/topologies/partition/toplogy.ts | 95 ++++--- .../src/topologies/standalone/topology.ts | 86 +++++-- packages/create-actor/tsup.config.ts | 1 + packages/create-actor/turbo.json | 12 +- packages/drivers/memory/package.json | 2 +- packages/drivers/memory/src/global_state.ts | 9 +- packages/drivers/memory/src/manager.ts | 18 +- packages/platforms/bun/package.json | 4 +- packages/platforms/bun/src/mod.ts | 25 +- .../platforms/cloudflare-workers/package.json | 2 +- packages/platforms/nodejs/package.json | 2 +- packages/platforms/nodejs/src/mod.ts | 13 +- packages/platforms/rivet/package.json | 2 +- packages/platforms/rivet/src/actor_handler.ts | 8 +- packages/platforms/rivet/src/config.ts | 2 +- .../platforms/rivet/src/manager_handler.ts | 10 +- yarn.lock | 98 +++++++- 49 files changed, 1210 insertions(+), 556 deletions(-) create mode 100644 packages/actor-core-cli/src/commands/dev.tsx create mode 100644 packages/actor-core-cli/src/workflows/validate-config.ts delete mode 100644 packages/actor-core/src/actor/inspect.ts create mode 100644 packages/actor-core/src/inspector/actor.ts create mode 100644 packages/actor-core/src/inspector/common.ts create mode 100644 packages/actor-core/src/inspector/config.ts create mode 100644 packages/actor-core/src/inspector/manager.ts create mode 100644 packages/actor-core/src/inspector/mod.ts rename packages/actor-core/src/{actor/protocol/inspector => inspector/protocol/actor}/mod.ts (100%) rename packages/actor-core/src/{actor/protocol/inspector => inspector/protocol/actor}/to-client.ts (100%) rename packages/actor-core/src/{actor/protocol/inspector => inspector/protocol/actor}/to-server.ts (100%) create mode 100644 packages/actor-core/src/inspector/protocol/manager/mod.ts create mode 100644 packages/actor-core/src/inspector/protocol/manager/to-client.ts create mode 100644 packages/actor-core/src/inspector/protocol/manager/to-server.ts delete mode 100644 packages/actor-core/src/manager/manager.ts create mode 100644 packages/actor-core/src/manager/router.ts diff --git a/packages/actor-core-cli/package.json b/packages/actor-core-cli/package.json index 28be9e018..4847083b2 100644 --- a/packages/actor-core-cli/package.json +++ b/packages/actor-core-cli/package.json @@ -33,9 +33,12 @@ "check-types": "tsc --noEmit" }, "dependencies": { + "@actor-core/nodejs": "workspace:^", "@sentry/profiling-node": "^9.3.0", "bundle-require": "^5.1.0", + "chokidar": "^4.0.3", "esbuild": "^0.25.1", + "open": "^10.1.0", "yoga-wasm-web": "0.3.3" }, "devDependencies": { diff --git a/packages/actor-core-cli/src/cli.ts b/packages/actor-core-cli/src/cli.ts index 6bc7d4e26..ff11779e3 100644 --- a/packages/actor-core-cli/src/cli.ts +++ b/packages/actor-core-cli/src/cli.ts @@ -1,5 +1,5 @@ import { PACKAGE_JSON } from "./macros" with { type: "macro" }; -import { create, deploy, program } from "./mod"; +import { create, deploy, dev, program } from "./mod"; export default program .name(PACKAGE_JSON.name) @@ -7,4 +7,5 @@ export default program .description(PACKAGE_JSON.description) .addCommand(deploy) .addCommand(create) + .addCommand(dev) .parse(); diff --git a/packages/actor-core-cli/src/commands/deploy.tsx b/packages/actor-core-cli/src/commands/deploy.tsx index 53f794f54..bff233981 100644 --- a/packages/actor-core-cli/src/commands/deploy.tsx +++ b/packages/actor-core-cli/src/commands/deploy.tsx @@ -8,7 +8,6 @@ import semver from "semver"; import which from "which"; import { MIN_RIVET_CLI_VERSION } from "../constants"; import { VERSION } from "../macros" with { type: "macro" }; -import { isBundleError, isNotFoundError, validateConfig } from "../utils/mod"; import { workflow } from "../workflow"; import { z } from "zod"; import { RivetClient } from "@rivet-gg/api"; @@ -17,6 +16,7 @@ import { createRivetApi, getServiceToken, } from "../utils/rivet-api"; +import { validateConfigTask } from "../workflows/validate-config"; export const deploy = new Command() .name("deploy") @@ -46,36 +46,7 @@ export const deploy = new Command() const { config, cli } = yield* ctx.task( "Prepare", async function* (ctx) { - const config = yield* ctx.task("Validate config", async () => { - try { - return await validateConfig(cwd); - } catch (error) { - const indexFile = path.relative( - process.cwd(), - path.join(cwd, "src", "index.ts"), - ); - if (isBundleError(error)) { - throw ctx.error( - `Could not parse Actors index file (${indexFile})\n${error.details}`, - { - hint: "Please make sure that the file exists and does not have any syntax errors.", - }, - ); - } else if (isNotFoundError(error)) { - throw ctx.error( - `Could not find Actors index file (${indexFile})`, - { - hint: "Please make sure that the file exists and not empty.", - }, - ); - } else { - console.error(error); - throw ctx.error("Failed to validate config.", { - hint: "Please check the logs above for more information.", - }); - } - } - }); + const config = yield* validateConfigTask(ctx, cwd); const cli = yield* ctx.task( "Locale rivet-cli", @@ -92,7 +63,7 @@ export const deploy = new Command() if (cliLocation) { // check version - const { stdout } = yield* ctx.$`${cliLocation} --version`; + const { stdout } = yield* exec`${cliLocation} --version`; const semVersion = semver.coerce( stdout.split("\n")[2].split(" ")[1].trim(), ); @@ -152,7 +123,7 @@ export const deploy = new Command() const envName = opts.env ?? - (yield* ctx.task("Select environment", async function* () { + (yield* ctx.task("Select environment", async function* (ctx) { const { stdout } = await exec`${cli} env ls --json`; const envs = JSON.parse(stdout); return yield* ctx.prompt("Select environment", { diff --git a/packages/actor-core-cli/src/commands/dev.tsx b/packages/actor-core-cli/src/commands/dev.tsx new file mode 100644 index 000000000..b5ab3d7b0 --- /dev/null +++ b/packages/actor-core-cli/src/commands/dev.tsx @@ -0,0 +1,83 @@ +import * as path from "node:path"; +import { Argument, Command, Option } from "commander"; +import { workflow } from "../workflow"; + +import { validateConfigTask } from "../workflows/validate-config"; +import { serve } from "@actor-core/nodejs"; +import chokidar from "chokidar"; +import { Text } from "ink"; +import open from "open"; +import { withResolvers } from "../utils/mod"; + +export const dev = new Command() + .name("dev") + .description("Run locally your ActorCore project.") + .addArgument(new Argument("[path]", "Location of the project")) + .addOption( + new Option("-p, --port [port]", "Specify which platform to use").default( + "6420", + ), + ) + .addOption( + new Option("--open", "Open the browser with ActorCore Studio").default( + true, + ), + ) + .option("--no-open", "Do not open the browser with ActorCore Studio") + .action(action); + +export async function action( + cmdPath = ".", + opts: { + port?: string; + open?: boolean; + } = {}, +) { + const cwd = path.join(process.cwd(), cmdPath); + await workflow("Run locally your ActorCore project", async function* (ctx) { + let server: ReturnType; + + if (opts.open) { + open( + process.env._ACTOR_CORE_CLI_DEV + ? "http://localhost:43708" + : "http://studio.actorcore.org", + ); + } + + const watcher = chokidar.watch(cwd, { + awaitWriteFinish: true, + ignoreInitial: true, + ignored: (path) => path.includes("node_modules"), + }); + + let lock: ReturnType = withResolvers(); + + watcher.on("all", async (event, path) => { + if (path.includes("node_modules") || path.includes("/.")) return; + + server?.close(); + if (lock) { + lock.resolve(undefined); + lock = withResolvers(); + } + }); + + while (true) { + const config = yield* validateConfigTask(ctx, cwd); + config.app.config.inspector = { + enabled: true, + }; + server = serve(config.app, { + port: Number.parseInt(opts.port || "6420", 10) || 6420, + }); + yield* ctx.task( + "Watching for changes...", + async () => { + await lock.promise; + }, + { success: (Changes detected, restarting!) }, + ); + } + }).render(); +} diff --git a/packages/actor-core-cli/src/mod.ts b/packages/actor-core-cli/src/mod.ts index 7ee146ba0..dc61d70cb 100644 --- a/packages/actor-core-cli/src/mod.ts +++ b/packages/actor-core-cli/src/mod.ts @@ -1,5 +1,6 @@ import "./instrument"; export { deploy } from "./commands/deploy"; export { create, action as createAction } from "./commands/create"; +export { dev } from "./commands/dev"; export { program } from "commander"; export default {}; diff --git a/packages/actor-core-cli/src/ui/Workflow.tsx b/packages/actor-core-cli/src/ui/Workflow.tsx index 85e6dab1d..602243537 100644 --- a/packages/actor-core-cli/src/ui/Workflow.tsx +++ b/packages/actor-core-cli/src/ui/Workflow.tsx @@ -9,7 +9,7 @@ import { import { ExecaError } from "execa"; import { Box, Text, type TextProps } from "ink"; import Spinner from "ink-spinner"; -import { useState } from "react"; +import { type ReactNode, useState } from "react"; import stripAnsi from "strip-ansi"; import { type WorkflowAction, WorkflowError } from "../workflow"; @@ -63,6 +63,7 @@ function Tasks({ interactive?: boolean; }) { const currentTasks = tasks.filter((task) => task.meta.parent === parent); + if (currentTasks.length === 0) { return null; } @@ -70,15 +71,15 @@ function Tasks({ {currentTasks.map((task) => ( - {"status" in task && task.status === "done" ? null : ( + {"status" in task && task.status === "done" && interactive ? null : ( @@ -171,7 +172,11 @@ export function Task({ <> {task.meta.opts?.showLabel === false && task.status !== "error" ? null : ( - + {task.meta.name} )} @@ -209,10 +214,12 @@ export function Status({ value, children, interactive, + done = (Done), ...rest }: TextProps & { value: WorkflowAction.Progress["status"]; interactive?: boolean; + done?: ReactNode; }) { return ( @@ -229,7 +236,7 @@ export function Status({ {" "} {children} {value === "running" && !interactive ? : null} - {value === "done" ? (Done) : null} + {value === "done" ? done : null} ); } @@ -307,7 +314,7 @@ export function Logs({ logs }: { logs: WorkflowAction.Log[] }) { if (log.type === "warn") { return ( - ⚠️ + ⚠︎ {log.message} ); diff --git a/packages/actor-core-cli/src/workflow.tsx b/packages/actor-core-cli/src/workflow.tsx index 97476c60d..cb49897b2 100644 --- a/packages/actor-core-cli/src/workflow.tsx +++ b/packages/actor-core-cli/src/workflow.tsx @@ -12,6 +12,7 @@ interface WorkflowResult { } interface TaskMetadata { + id: string; name: string; parent: string | null; opts?: TaskOptions; @@ -34,7 +35,7 @@ export namespace WorkflowAction { export const progress = ( meta: TaskMetadata, status: "running" | "done" | "error", - res: { error?: unknown; result?: unknown } = {}, + res: { error?: unknown; result?: unknown } & TaskOptions = {}, ): Progress => ({ status, meta, @@ -162,11 +163,12 @@ type UserFnReturnType = | Promise >; -interface Context { +export interface Context { wait: (ms: number) => Promise; task: ( name: string, taskFn: (toolbox: Context) => T, + opts?: TaskOptions, ) => AsyncGenerator< WorkflowAction.All, T extends AsyncGenerator< @@ -205,6 +207,13 @@ interface Context { interface TaskOptions { showLabel?: boolean; + success?: ReactNode; +} + +let TASK_ID = 0; + +function getTaskId() { + return String(TASK_ID++); } export function workflow( @@ -222,17 +231,18 @@ export function workflow( taskFn: (ctx: Context) => T, opts: TaskOptions = {}, ): AsyncGenerator { - const p = WorkflowAction.progress.bind(null, { ...meta, name, opts }); + const id = getTaskId(); + const p = WorkflowAction.progress.bind(null, { ...meta, id, name, opts }); yield p("running"); try { - const output = taskFn(createContext({ ...meta, name, opts })); + const output = taskFn(createContext({ ...meta, id, name, opts })); if (output instanceof Promise) { const result = await output; - yield p("done", { result }); + yield p("done", { result, ...opts }); return result; } const result = yield* output; - yield p("done", { result }); + yield p("done", { result, ...opts }); return result; } catch (error) { yield p("error", { error }); @@ -247,7 +257,8 @@ export function workflow( new Promise((resolve) => setTimeout(resolve, ms)), task: runner.bind(null, { ...meta, - parent: meta.name, + parent: meta.id, + id: "", name: "", }) as Context["task"], render(children: React.ReactNode) { @@ -285,7 +296,7 @@ export function workflow( withResolvers>(); yield WorkflowAction.prompt( - { ...meta, parent: meta.name, name: question }, + { ...meta, id: meta.id, name: question }, question, { answer: null, @@ -297,7 +308,7 @@ export function workflow( const result = await promise; yield WorkflowAction.prompt( - { ...meta, parent: meta.name, name: question }, + { ...meta, id: meta.id, name: question }, question, { answer: result, @@ -324,13 +335,14 @@ export function workflow( > { // task <> parent const parentMap = new Map(); + const id = getTaskId(); try { yield WorkflowAction.progress( - { name: title, parent: null, opts }, + { id, name: title, parent: null, opts }, "running", ); for await (const task of workflowFn( - createContext({ name: title, parent: null }), + createContext({ id, name: title, parent: id }), )) { if (!task || typeof task !== "object") { continue; @@ -338,14 +350,14 @@ export function workflow( if ("__taskProgress" in task) { const parent = task.meta?.parent || title; - parentMap.set(task.meta.name, parent); + parentMap.set(task.meta.id, parent); // Propagate errors up the tree if (task.status === "error") { - let parentTask = parentMap.get(task.meta.name); + let parentTask = parentMap.get(id); while (parentTask) { const grandParent = parentMap.get(parentTask); yield WorkflowAction.progress( - { name: parentTask, parent: grandParent || null }, + { id, name: parentTask, parent: grandParent || null }, "error", ); parentTask = grandParent; @@ -364,13 +376,13 @@ export function workflow( } yield WorkflowAction.progress( - { name: title, parent: null, opts }, + { name: title, parent: null, opts, id }, "done", ); return { success: true }; } catch (error) { yield WorkflowAction.progress( - { name: title, parent: null, opts }, + { name: title, parent: null, opts, id }, "error", { error, @@ -383,10 +395,11 @@ export function workflow( return { title, async render() { + const interactive = !process.env.CI; renderUtils = render( - + , ); @@ -406,8 +419,8 @@ export function workflow( continue; } - const index = tasks.findIndex((t) => t.meta.name === task.meta.name); - if (index === -1) { + const index = tasks.findIndex((t) => t.meta.id === task.meta.id); + if (index === -1 || !interactive) { tasks.push(task); } else { tasks[index] = { ...tasks[index], ...task }; @@ -416,10 +429,16 @@ export function workflow( renderUtils.rerender( - + , ); + + if ("__taskProgress" in task && task.status === "error") { + renderUtils.unmount(); + process.exit(1); + break; + } } for (const hook of hooks.afterAll) { diff --git a/packages/actor-core-cli/src/workflows/validate-config.ts b/packages/actor-core-cli/src/workflows/validate-config.ts new file mode 100644 index 000000000..9d88fe7b0 --- /dev/null +++ b/packages/actor-core-cli/src/workflows/validate-config.ts @@ -0,0 +1,37 @@ +import { + isBundleError, + isNotFoundError, + validateConfig, +} from "../utils/config"; +import path from "node:path"; +import type { Context } from "../workflow"; + +export function validateConfigTask(ctx: Context, cwd: string) { + return ctx.task("Build project", async () => { + try { + return await validateConfig(cwd); + } catch (error) { + const indexFile = path.relative( + process.cwd(), + path.join(cwd, "src", "index.ts"), + ); + if (isBundleError(error)) { + throw ctx.error( + `Could not parse Actors index file (${indexFile})\n${error.details}`, + { + hint: "Please make sure that the file exists and does not have any syntax errors.", + }, + ); + } else if (isNotFoundError(error)) { + throw ctx.error(`Could not find Actors index file (${indexFile})`, { + hint: "Please make sure that the file exists and not empty.", + }); + } else { + console.error(error); + throw ctx.error("Failed to build project config.", { + hint: "Please check the logs above for more information.", + }); + } + } + }); +} diff --git a/packages/actor-core-cli/tsup.config.ts b/packages/actor-core-cli/tsup.config.ts index 85d1138a5..669c4bdd9 100644 --- a/packages/actor-core-cli/tsup.config.ts +++ b/packages/actor-core-cli/tsup.config.ts @@ -31,6 +31,7 @@ export default defineConfig({ "process.env.NODE_ENV": JSON.stringify("production"), "process.env.SENTRY_DSN": JSON.stringify(process.env.SENTRY_DSN || ""), }, + ignoreWatch: ["./tsup.config.bundled*"], banner(ctx) { return { js: `#!/usr/bin/env node${createRequireSnippet}` }; }, diff --git a/packages/actor-core-cli/turbo.json b/packages/actor-core-cli/turbo.json index d6300f6ae..07b3ba3fb 100644 --- a/packages/actor-core-cli/turbo.json +++ b/packages/actor-core-cli/turbo.json @@ -5,6 +5,10 @@ "build": { "inputs": ["src/**", "./tsup.config.ts"], "outputs": ["dist/**"] + }, + "dev": { + "inputs": ["src/**", "./tsup.config.ts"], + "outputs": ["dist/**"] } } } diff --git a/packages/actor-core/package.json b/packages/actor-core/package.json index 0112ba9dd..19cb4ce49 100644 --- a/packages/actor-core/package.json +++ b/packages/actor-core/package.json @@ -102,16 +102,6 @@ "default": "./dist/cli/mod.cjs" } }, - "./protocol/inspector": { - "import": { - "types": "./dist/actor/protocol/inspector/mod.d.ts", - "default": "./dist/actor/protocol/inspector/mod.js" - }, - "require": { - "types": "./dist/actor/protocol/inspector/mod.d.cts", - "default": "./dist/actor/protocol/inspector/mod.cjs" - } - }, "./protocol/http": { "import": { "types": "./dist/actor/protocol/http/rpc.d.ts", @@ -131,11 +121,42 @@ "types": "./dist/test/mod.d.cts", "default": "./dist/test/mod.cjs" } + }, + "./inspector": { + "import": { + "types": "./dist/inspector/mod.d.ts", + "default": "./dist/inspector/mod.js" + }, + "require": { + "types": "./dist/inspector/mod.d.cts", + "default": "./dist/inspector/mod.cjs" + } + }, + "./inspector/protocol/actor": { + "import": { + "types": "./dist/inspector/protocol/actor/mod.d.ts", + "default": "./dist/inspector/protocol/actor/mod.js" + }, + "require": { + "types": "./dist/inspector/protocol/actor/mod.d.cts", + "default": "./dist/inspector/protocol/actor/mod.cjs" + } + }, + "./inspector/protocol/manager": { + "import": { + "types": "./dist/inspector/protocol/manager/mod.d.ts", + "default": "./dist/inspector/protocol/manager/mod.js" + }, + "require": { + "types": "./dist/inspector/protocol/manager/mod.d.cts", + "default": "./dist/inspector/protocol/manager/mod.cjs" + } } }, "sideEffects": false, "scripts": { - "build": "tsup src/mod.ts src/client/mod.ts src/common/log.ts src/actor/errors.ts src/topologies/coordinate/mod.ts src/topologies/partition/mod.ts src/utils.ts src/driver-helpers/mod.ts src/cli/mod.ts src/actor/protocol/inspector/mod.ts src/actor/protocol/http/rpc.ts src/test/mod.ts", + "dev": "yarn build --watch", + "build": "tsup src/mod.ts src/client/mod.ts src/common/log.ts src/actor/errors.ts src/topologies/coordinate/mod.ts src/topologies/partition/mod.ts src/utils.ts src/driver-helpers/mod.ts src/cli/mod.ts src/actor/protocol/inspector/mod.ts src/actor/protocol/http/rpc.ts src/test/mod.ts src/inspector/protocol/actor/mod.ts src/inspector/protocol/manager/mod.ts src/inspector/mod.ts", "check-types": "tsc --noEmit", "boop": "tsc --outDir dist/test -d", "test": "vitest run", diff --git a/packages/actor-core/src/actor/inspect.ts b/packages/actor-core/src/actor/inspect.ts deleted file mode 100644 index 04d6936d1..000000000 --- a/packages/actor-core/src/actor/inspect.ts +++ /dev/null @@ -1,235 +0,0 @@ -import type { AnyActorInstance } from "@/actor/instance"; -import type { AnyConn, Conn, ConnId } from "@/actor/connection"; -import { throttle } from "@/actor/utils"; -import type { UpgradeWebSocket, WSContext } from "hono/ws"; -import { Hono, type HonoRequest } from "hono"; -import * as errors from "@/actor/errors"; -import { deconstructError, safeStringify } from "@/common/utils"; -import { - type ToServer, - ToServerSchema, -} from "@/actor/protocol/inspector/to-server"; -import type { ToClient } from "@/actor/protocol/inspector/to-client"; -import { logger } from "./log"; - -export interface ConnectInspectorOpts { - req: HonoRequest; -} - -export interface ConnectInspectortOutput { - onOpen: (ws: WSContext) => Promise; - onMessage: (message: ToServer) => Promise; - onClose: () => Promise; -} - -export type InspectorConnHandler = ( - opts: ConnectInspectorOpts, -) => Promise; - -/** - * Create a router for the inspector. - * @internal - */ -export function createInspectorRouter( - upgradeWebSocket: UpgradeWebSocket | undefined, - onConnect: InspectorConnHandler | undefined, -) { - const app = new Hono(); - - if (!upgradeWebSocket || !onConnect) { - return app.get("/", async (c) => { - return c.json({ - error: "Inspector disabled. Only available on WebSocket connections.", - }); - }); - } - return app.get( - "/", - upgradeWebSocket(async (c) => { - try { - const handler = await onConnect({ req: c.req }); - return { - onOpen: async (_, ws) => { - try { - await handler.onOpen(ws); - } catch (error) { - const { code } = deconstructError(error, logger(), { - wsEvent: "open", - }); - ws.close(1011, code); - } - }, - onClose: async () => { - try { - await handler.onClose(); - } catch (error) { - deconstructError(error, logger(), { - wsEvent: "close", - }); - } - }, - onMessage: async (event, ws) => { - try { - const { success, data, error } = ToServerSchema.safeParse( - JSON.parse(event.data.valueOf() as string), - ); - if (!success) throw new errors.MalformedMessage(error); - - await handler.onMessage(data); - } catch (error) { - const { code } = deconstructError(error, logger(), { - wsEvent: "message", - }); - ws.close(1011, code); - } - }, - }; - } catch (error) { - deconstructError(error, logger(), {}); - return {}; - } - }), - ); -} - -/** - * Represents a connection to an actor. - * @internal - */ -export class InspectorConnection { - constructor( - public readonly id: string, - private readonly ws: WSContext, - ) {} - - send(message: ToClient) { - try { - const serialized = safeStringify(message, 128 * 1024 * 1024); - return this.ws.send(serialized); - } catch { - return this.ws.send( - JSON.stringify({ - type: "error", - message: "Failed to serialize message due to size constraints.", - } satisfies ToClient), - ); - } - } -} -/** - * Provides a unified interface for inspecting actor external and internal state. - */ -export class Inspector { - /** - * Inspected actor instance. - * @internal - */ - readonly actor: AnyActorInstance; - - /** - * Map of all connections to the inspector. - * @internal - */ - readonly #connections = new Map(); - - /** - * Connection counter. - */ - #conId = 0; - - /** - * Notify all inspector listeners of an actor's state change. - * @param state - The new state. - * @internal - */ - onStateChange = throttle((state: unknown) => { - this.__broadcast(this.#createInfoMessage()); - }, 500); - - /** - * - * Notify all inspector listeners of an actor's connections change. - * @param connections - The new connections. - * @internal - */ - onConnChange = throttle( - (connections: Map) => { - this.__broadcast(this.#createInfoMessage()); - }, - 500, - ); - - constructor(actor: AnyActorInstance) { - this.actor = actor; - } - - /** - * Broadcast a message to all inspector connections. - * @internal - */ - __broadcast(msg: ToClient) { - for (const conn of this.#connections.values()) { - conn.send(msg); - } - } - - /** - * Process a message from a connection. - * @internal - */ - __processMessage(connection: InspectorConnection, message: ToServer) { - if (message.type === "info") { - return connection.send(this.#createInfoMessage()); - } - if (message.type === "setState") { - this.actor.state = message.state; - return; - } - - throw new errors.Unreachable(message); - } - - /** - * Create an info message for the inspector. - */ - #createInfoMessage(): ToClient { - return { - type: "info", - connections: Array.from(this.actor.conns).map( - ([id, connection]) => ({ - id, - parameters: connection.params, - state: { - value: connection._stateEnabled ? connection.state : undefined, - enabled: connection._stateEnabled, - }, - }), - ), - rpcs: this.actor.rpcs, - state: { - value: this.actor.stateEnabled ? this.actor.state : undefined, - enabled: this.actor.stateEnabled, - }, - }; - } - - /** - * Create a new connection to the inspector. - * Connection will be notified of all state changes. - * @internal - */ - __createConnection(ws: WSContext): InspectorConnection { - const id = `${this.#conId++}`; - const con = new InspectorConnection(id, ws); - this.#connections.set(id, con); - return con; - } - - /** - * Remove a connection from the inspector. - * @internal - */ - __removeConnection(con: InspectorConnection): void { - this.#connections.delete(con.id); - } -} diff --git a/packages/actor-core/src/actor/instance.ts b/packages/actor-core/src/actor/instance.ts index 01c3154af..e3a7c93bf 100644 --- a/packages/actor-core/src/actor/instance.ts +++ b/packages/actor-core/src/actor/instance.ts @@ -15,7 +15,7 @@ import { Schedule } from "./schedule"; import { KEYS } from "./keys"; import type * as wsToServer from "@/actor/protocol/message/to-server"; import { CachedSerializer } from "./protocol/serde"; -import { Inspector } from "@/actor/inspect"; +import { ActorInspector } from "@/inspector/actor"; import { ActorContext } from "./context"; import invariant from "invariant"; @@ -123,7 +123,7 @@ export class ActorInstance { * Inspector for the actor. * @internal */ - inspector!: Inspector; + inspector!: ActorInspector; get id() { return this.#actorId; @@ -156,7 +156,7 @@ export class ActorInstance { this.#tags = tags; this.#region = region; this.#schedule = new Schedule(this, actorDriver); - this.inspector = new Inspector(this); + this.inspector = new ActorInspector(this); // Initialize server // diff --git a/packages/actor-core/src/actor/router.ts b/packages/actor-core/src/actor/router.ts index 3a1c6fed4..ccb252c53 100644 --- a/packages/actor-core/src/actor/router.ts +++ b/packages/actor-core/src/actor/router.ts @@ -1,21 +1,23 @@ -import { Handler, Hono, Context as HonoContext, HonoRequest } from "hono"; -import { ContentfulStatusCode } from "hono/utils/http-status"; -import type { UpgradeWebSocket, WSContext, WSEvents } from "hono/ws"; +import { Hono, type HonoRequest } from "hono"; +import type { UpgradeWebSocket, WSContext } from "hono/ws"; import * as errors from "./errors"; import { logger } from "./log"; import { type Encoding, EncodingSchema } from "@/actor/protocol/serde"; import { parseMessage } from "@/actor/protocol/message/mod"; import * as protoHttpRpc from "@/actor/protocol/http/rpc"; -import * as messageToServer from "@/actor/protocol/message/to-server"; +import type * as messageToServer from "@/actor/protocol/message/to-server"; import type { InputData } from "@/actor/protocol/serde"; -import { SSEStreamingApi, streamSSE } from "hono/streaming"; +import { type SSEStreamingApi, streamSSE } from "hono/streaming"; import { cors } from "hono/cors"; import { assertUnreachable } from "./utils"; -import { createInspectorRouter, InspectorConnHandler } from "./inspect"; import { handleRouteError, handleRouteNotFound } from "@/common/router"; import { deconstructError } from "@/common/utils"; -import { DriverConfig } from "@/driver-helpers/config"; -import { AppConfig } from "@/app/config"; +import type { DriverConfig } from "@/driver-helpers/config"; +import type { AppConfig } from "@/app/config"; +import { + type ActorInspectorConnHandler, + createActorInspectorRouter, +} from "@/inspector/actor"; export interface ConnectWebSocketOpts { req: HonoRequest; @@ -68,7 +70,7 @@ export interface ActorRouterHandler { onConnectSse(opts: ConnectSseOpts): Promise; onRpc(opts: RpcOpts): Promise; onConnMessage(opts: ConnsMessageOpts): Promise; - onConnectInspector?: InspectorConnHandler; + onConnectInspector?: ActorInspectorConnHandler; } /** @@ -116,7 +118,11 @@ export function createActorRouter( throw new Error("onConnectWebSocket is not implemented"); const encoding = getRequestEncoding(c.req); - const parameters = getRequestConnParams(c.req, appConfig, driverConfig); + const parameters = getRequestConnParams( + c.req, + appConfig, + driverConfig, + ); const wsHandler = await handler.onConnectWebSocket({ req: c.req, @@ -354,10 +360,16 @@ export function createActorRouter( } }); - app.route( - "/inspect", - createInspectorRouter(handler.upgradeWebSocket, handler.onConnectInspector), - ); + if (appConfig.inspector.enabled) { + app.route( + "/inspect", + createActorInspectorRouter( + handler.upgradeWebSocket, + handler.onConnectInspector, + appConfig.inspector, + ), + ); + } app.notFound(handleRouteNotFound); app.onError(handleRouteError); diff --git a/packages/actor-core/src/app/config.ts b/packages/actor-core/src/app/config.ts index 4c6eebb32..701f0d958 100644 --- a/packages/actor-core/src/app/config.ts +++ b/packages/actor-core/src/app/config.ts @@ -3,6 +3,7 @@ import { z } from "zod"; import type { cors } from "hono/cors"; import { ActorDefinition, AnyActorDefinition } from "@/actor/definition"; +import { InspectorConfigSchema } from "@/inspector/config"; // Define CORS options schema type CorsOptions = NonNullable[0]>; @@ -41,7 +42,10 @@ export const ActorPeerConfigSchema = z.object({ }); export type ActorPeerConfig = z.infer; -export const ActorsSchema = z.record(z.string(), z.custom>()) +export const ActorsSchema = z.record( + z.string(), + z.custom>(), +); export type Actors = z.infer; /** Base config used for the actor config across all platforms. */ @@ -61,6 +65,12 @@ export const AppConfigSchema = z.object({ /** Peer configuration for coordinated topology. */ actorPeer: ActorPeerConfigSchema.optional().default({}), + + /** Inspector configuration. */ + inspector: InspectorConfigSchema.optional().default({ enabled: false }), }); export type AppConfig = z.infer; -export type AppConfigInput = Omit, "actors"> & { actors: A}; +export type AppConfigInput = Omit< + z.input, + "actors" +> & { actors: A }; diff --git a/packages/actor-core/src/driver-helpers/config.ts b/packages/actor-core/src/driver-helpers/config.ts index f210fed2a..2ffde6757 100644 --- a/packages/actor-core/src/driver-helpers/config.ts +++ b/packages/actor-core/src/driver-helpers/config.ts @@ -10,9 +10,10 @@ import type { Context as HonoContext, Handler as HonoHandler, } from "hono"; -import { CoordinateDriver } from "@/topologies/coordinate/driver"; -import { ManagerDriver } from "@/manager/driver"; -import { ActorDriver } from "@/actor/driver"; +import type { CoordinateDriver } from "@/topologies/coordinate/driver"; +import type { ManagerDriver } from "@/manager/driver"; +import type { ActorDriver } from "@/actor/driver"; +import type { InspectorConnHandler } from "@/inspector/common"; export const TopologySchema = z.enum(["standalone", "partition", "coordinate"]); export type Topology = z.infer; diff --git a/packages/actor-core/src/inspector/actor.ts b/packages/actor-core/src/inspector/actor.ts new file mode 100644 index 000000000..768b62b79 --- /dev/null +++ b/packages/actor-core/src/inspector/actor.ts @@ -0,0 +1,118 @@ +import type { AnyActorInstance } from "@/actor/instance"; +import type { AnyConn, ConnId } from "@/actor/connection"; +import { throttle } from "@/actor/utils"; +import type { UpgradeWebSocket } from "hono/ws"; +import * as errors from "@/actor/errors"; +import { + type ToClient, + type ToServer, + ToServerSchema, +} from "@/inspector/protocol/actor/mod"; +import { logger } from "@/actor/log"; +import { + createInspectorRoute, + Inspector, + type InspectorConnection, + type InspectorConnHandler, +} from "./common"; +import type { InspectorConfig } from "./config"; + +export type ActorInspectorConnHandler = InspectorConnHandler; + +/** + * Create a router for the actor inspector. + * @internal + */ +export function createActorInspectorRouter( + upgradeWebSocket: UpgradeWebSocket | undefined, + onConnect: ActorInspectorConnHandler | undefined, + config: InspectorConfig, +) { + return createInspectorRoute({ + upgradeWebSocket, + onConnect, + config, + logger: logger(), + serverMessageSchema: ToServerSchema, + }); +} + +/** + * Represents a connection to an actor. + * @internal + */ +export type ActorInspectorConnection = InspectorConnection; + +/** + * Provides a unified interface for inspecting actor external and internal state. + */ +export class ActorInspector extends Inspector { + /** + * Inspected actor instance. + * @internal + */ + readonly actor: AnyActorInstance; + + /** + * Notify all inspector listeners of an actor's state change. + * @param state - The new state. + * @internal + */ + onStateChange = throttle((state: unknown) => { + this.broadcast(this.#createInfoMessage()); + }, 500); + + /** + * + * Notify all inspector listeners of an actor's connections change. + * @param connections - The new connections. + * @internal + */ + onConnChange = throttle((connections: Map) => { + this.broadcast(this.#createInfoMessage()); + }, 500); + + constructor(actor: AnyActorInstance) { + super(); + this.actor = actor; + } + + /** + * Process a message from a connection. + * @internal + */ + processMessage(connection: ActorInspectorConnection, message: ToServer) { + super.processMessage(connection, message); + if (message.type === "info") { + return connection.send(this.#createInfoMessage()); + } + if (message.type === "setState") { + this.actor.state = message.state; + return; + } + + throw new errors.Unreachable(message); + } + + /** + * Create an info message for the inspector. + */ + #createInfoMessage(): ToClient { + return { + type: "info", + connections: Array.from(this.actor.conns).map(([id, connection]) => ({ + id, + parameters: connection.params, + state: { + value: connection._stateEnabled ? connection.state : undefined, + enabled: connection._stateEnabled, + }, + })), + rpcs: this.actor.rpcs, + state: { + value: this.actor.stateEnabled ? this.actor.state : undefined, + enabled: this.actor.stateEnabled, + }, + }; + } +} diff --git a/packages/actor-core/src/inspector/common.ts b/packages/actor-core/src/inspector/common.ts new file mode 100644 index 000000000..c2cce6110 --- /dev/null +++ b/packages/actor-core/src/inspector/common.ts @@ -0,0 +1,187 @@ +import type { ConnId } from "@/actor/connection"; +import { deconstructError, safeStringify } from "@/common/utils"; +import { Hono, type HonoRequest } from "hono"; +import type { UpgradeWebSocket, WSContext } from "hono/ws"; +import type { InspectorConfig } from "./config"; +import type { Logger } from "@/common/log"; +import * as errors from "@/actor/errors"; +import type { ZodSchema } from "zod"; + +interface ConnectInspectorOpts { + req: HonoRequest; +} + +export interface ConnectInspectorOutput { + onOpen: (ws: WSContext) => Promise; + onMessage: (message: MsgSchema) => Promise; + onClose: () => Promise; +} + +export type InspectorConnHandler = ( + opts: ConnectInspectorOpts, +) => Promise>; + +/** + * Represents a connection to an actor. + * @internal + */ +export class InspectorConnection { + constructor( + public readonly id: string, + private readonly ws: WSContext, + ) {} + + send(message: MsgSchema) { + try { + const serialized = safeStringify(message, 128 * 1024 * 1024); + return this.ws.send(serialized); + } catch { + return this.ws.send( + JSON.stringify({ + type: "error", + message: "Failed to serialize message due to size constraints.", + }), + ); + } + } +} + +/** + * Provides a unified interface for inspecting actor and managers. + */ +export class Inspector { + /** + * Map of all connections to the inspector. + * @internal + */ + readonly #connections = new Map< + ConnId, + InspectorConnection + >(); + + /** + * Connection counter. + */ + #conId = 0; + + /** + * Broadcast a message to all inspector connections. + * @internal + */ + broadcast(msg: ToClientSchema) { + for (const conn of this.#connections.values()) { + conn.send(msg); + } + } + + /** + * Process a message from a connection. + * @internal + */ + processMessage( + connection: InspectorConnection, + message: ToServerSchema, + ) {} + + /** + * Create a new connection to the inspector. + * Connection will be notified of all state changes. + * @internal + */ + createConnection(ws: WSContext): InspectorConnection { + const id = `${this.#conId++}`; + const con = new InspectorConnection(id, ws); + this.#connections.set(id, con); + return con; + } + + /** + * Remove a connection from the inspector. + * @internal + */ + removeConnection(con: InspectorConnection): void { + this.#connections.delete(con.id); + } +} + +export function createInspectorRoute< + ConnectionHandler extends + InspectorConnHandler, +>({ + upgradeWebSocket, + onConnect, + config, + logger, + serverMessageSchema, +}: { + upgradeWebSocket: UpgradeWebSocket | undefined; + onConnect: ConnectionHandler | undefined; + config: InspectorConfig; + logger: Logger; + serverMessageSchema: ZodSchema; +}) { + const app = new Hono(); + + if (!upgradeWebSocket || !onConnect || !config.enabled) { + return app.get("/", async (c) => { + return c.json({ + error: "Inspector disabled. Only available on WebSocket connections.", + }); + }); + } + + return app.get( + "/", + async (c, next) => { + const result = + (await config.onRequest?.({ req: c.req })) ?? config.enabled; + if (!result) return c.json({ error: "Inspector disabled." }, 403); + return next(); + }, + upgradeWebSocket(async (c) => { + try { + const handler = await onConnect({ req: c.req }); + return { + onOpen: async (_, ws) => { + try { + await handler.onOpen(ws); + } catch (error) { + const { code } = deconstructError(error, logger, { + wsEvent: "open", + }); + ws.close(1011, code); + } + }, + onClose: async () => { + try { + await handler.onClose(); + } catch (error) { + deconstructError(error, logger, { + wsEvent: "close", + }); + } + }, + onMessage: async (event, ws) => { + try { + const { success, data, error } = serverMessageSchema.safeParse( + JSON.parse(event.data.valueOf() as string), + ); + if (!success) throw new errors.MalformedMessage(error); + + await handler.onMessage(data); + } catch (error) { + const { code } = deconstructError(error, logger, { + wsEvent: "message", + }); + ws.close(1011, code); + } + }, + }; + } catch (error) { + deconstructError(error, logger, {}); + return {}; + } + }), + ); +} diff --git a/packages/actor-core/src/inspector/config.ts b/packages/actor-core/src/inspector/config.ts new file mode 100644 index 000000000..badb62068 --- /dev/null +++ b/packages/actor-core/src/inspector/config.ts @@ -0,0 +1,16 @@ +import type { HonoRequest } from "hono"; +import { z } from "zod"; + +export const InspectorConfigSchema = z.object({ + enabled: z.boolean().optional().default(false), + /** + * Handler for incoming requests. + * A best place to add authentication. + */ + onRequest: z + .function() + .args(z.object({ req: z.custom() })) + .returns(z.promise(z.boolean()).or(z.boolean())) + .optional(), +}); +export type InspectorConfig = z.infer; diff --git a/packages/actor-core/src/inspector/manager.ts b/packages/actor-core/src/inspector/manager.ts new file mode 100644 index 000000000..a607d5258 --- /dev/null +++ b/packages/actor-core/src/inspector/manager.ts @@ -0,0 +1,114 @@ +import type { UpgradeWebSocket } from "hono/ws"; +import { + type ToClient, + type ToServer, + ToServerSchema, +} from "@/inspector/protocol/manager/mod"; +import { logger } from "@/manager/log"; +import * as errors from "@/actor/errors"; +import { + createInspectorRoute, + Inspector, + type InspectorConnection, + type InspectorConnHandler, +} from "./common"; +import type { InspectorConfig } from "./config"; +import type { ManagerDriver } from "@/manager/driver"; +import { throttle } from "@/actor/utils"; + +export type ManagerInspectorConnHandler = InspectorConnHandler; + +interface Actor { + id: string; + name: string; + tags: Record; + region?: string; + createdAt?: string; + destroyedAt?: string; +} + +/** + * Create a router for the Manager Inspector. + * @internal + */ +export function createManagerInspectorRouter( + upgradeWebSocket: UpgradeWebSocket | undefined, + onConnect: ManagerInspectorConnHandler | undefined, + config: InspectorConfig, +) { + return createInspectorRoute({ + upgradeWebSocket, + onConnect, + config, + logger: logger(), + serverMessageSchema: ToServerSchema, + }); +} + +/** + * Represents a connection to an actor. + * @internal + */ +export type ManagerInspectorConnection = InspectorConnection; + +/** + * Provides a unified interface for inspecting actor external and internal state. + */ +export class ManagerInspector extends Inspector { + /** + * Inspected actor instance. + * @internal + */ + readonly driver: ManagerDriver; + + /** + * Notify all inspector listeners of an actor's state change. + * @param state - The new state. + */ + public onActorsChange = throttle((actors: Actor[]) => { + this.broadcast({ type: "actors", actors }); + }, 500); + + constructor( + driver: ManagerDriver, + private readonly hooks: { + getAllActors: () => Actor[]; + getAllTypesOfActors: () => string[]; + }, + ) { + super(); + this.driver = driver; + } + + /** + * Process a message from a connection. + * @internal + */ + processMessage(connection: ManagerInspectorConnection, incoming: unknown) { + const result = ToServerSchema.safeParse(incoming); + + if (!result.success) { + logger().warn("Invalid message", result.error); + return connection.send({ + type: "error", + message: "Invalid message", + }); + } + const message = result.data; + + if (message.type === "info") { + return connection.send({ + type: "info", + actors: this.hooks.getAllActors(), + types: this.hooks.getAllTypesOfActors(), + }); + } + + if (message.type === "destroy") { + // TODO + return; + } + + throw new errors.Unreachable(message); + } +} diff --git a/packages/actor-core/src/inspector/mod.ts b/packages/actor-core/src/inspector/mod.ts new file mode 100644 index 000000000..388f1e810 --- /dev/null +++ b/packages/actor-core/src/inspector/mod.ts @@ -0,0 +1,2 @@ +export { ManagerInspector } from "./manager"; +export { ActorInspector } from "./actor"; diff --git a/packages/actor-core/src/actor/protocol/inspector/mod.ts b/packages/actor-core/src/inspector/protocol/actor/mod.ts similarity index 100% rename from packages/actor-core/src/actor/protocol/inspector/mod.ts rename to packages/actor-core/src/inspector/protocol/actor/mod.ts diff --git a/packages/actor-core/src/actor/protocol/inspector/to-client.ts b/packages/actor-core/src/inspector/protocol/actor/to-client.ts similarity index 100% rename from packages/actor-core/src/actor/protocol/inspector/to-client.ts rename to packages/actor-core/src/inspector/protocol/actor/to-client.ts diff --git a/packages/actor-core/src/actor/protocol/inspector/to-server.ts b/packages/actor-core/src/inspector/protocol/actor/to-server.ts similarity index 100% rename from packages/actor-core/src/actor/protocol/inspector/to-server.ts rename to packages/actor-core/src/inspector/protocol/actor/to-server.ts diff --git a/packages/actor-core/src/inspector/protocol/manager/mod.ts b/packages/actor-core/src/inspector/protocol/manager/mod.ts new file mode 100644 index 000000000..0bf544b94 --- /dev/null +++ b/packages/actor-core/src/inspector/protocol/manager/mod.ts @@ -0,0 +1,2 @@ +export * from "./to-client"; +export * from "./to-server"; diff --git a/packages/actor-core/src/inspector/protocol/manager/to-client.ts b/packages/actor-core/src/inspector/protocol/manager/to-client.ts new file mode 100644 index 000000000..75f9c9311 --- /dev/null +++ b/packages/actor-core/src/inspector/protocol/manager/to-client.ts @@ -0,0 +1,27 @@ +import { z } from "zod"; + +const ActorSchema = z.object({ + id: z.string(), + name: z.string(), + tags: z.record(z.string()), +}); + +export type Actor = z.infer; + +export const ToClientSchema = z.discriminatedUnion("type", [ + z.object({ + type: z.literal("info"), + actors: z.array(ActorSchema), + types: z.array(z.string()), + }), + z.object({ + type: z.literal("actors"), + actors: z.array(ActorSchema), + }), + z.object({ + type: z.literal("error"), + message: z.string(), + }), +]); + +export type ToClient = z.infer; diff --git a/packages/actor-core/src/inspector/protocol/manager/to-server.ts b/packages/actor-core/src/inspector/protocol/manager/to-server.ts new file mode 100644 index 000000000..f5b6a749c --- /dev/null +++ b/packages/actor-core/src/inspector/protocol/manager/to-server.ts @@ -0,0 +1,13 @@ +import { z } from "zod"; + +export const ToServerSchema = z.discriminatedUnion("type", [ + z.object({ + type: z.literal("info"), + }), + z.object({ + type: z.literal("destroy"), + actorId: z.string(), + }), +]); + +export type ToServer = z.infer; diff --git a/packages/actor-core/src/manager/driver.ts b/packages/actor-core/src/manager/driver.ts index f409a9c01..09888c39d 100644 --- a/packages/actor-core/src/manager/driver.ts +++ b/packages/actor-core/src/manager/driver.ts @@ -1,12 +1,15 @@ -import { ActorTags } from "@/common/utils"; +import type { ActorTags } from "@/common/utils"; +import type { ManagerInspector } from "@/inspector/manager"; import type { Env, Context as HonoContext } from "hono"; - export interface ManagerDriver { getForId(input: GetForIdInput): Promise; getWithTags(input: GetWithTagsInput): Promise; createActor(input: CreateActorInput): Promise; -}export interface GetForIdInput { + + inspector?: ManagerInspector; +} +export interface GetForIdInput { c?: HonoContext; baseUrl: string; actorId: string; @@ -37,4 +40,3 @@ export interface CreateActorInput { export interface CreateActorOutput { endpoint: string; } - diff --git a/packages/actor-core/src/manager/manager.ts b/packages/actor-core/src/manager/manager.ts deleted file mode 100644 index 82eaa8d22..000000000 --- a/packages/actor-core/src/manager/manager.ts +++ /dev/null @@ -1,131 +0,0 @@ -import { ActorsRequestSchema } from "@/manager/protocol/mod"; -import { Hono, type Context as HonoContext } from "hono"; -import { cors } from "hono/cors"; -import type { ManagerDriver } from "@/manager/driver"; -import { logger } from "./log"; -import { type ActorTags, assertUnreachable } from "@/common/utils"; -import { handleRouteError, handleRouteNotFound } from "@/common/router"; -import { DriverConfig } from "@/driver-helpers/config"; -import { AppConfig } from "@/app/config"; - -export class Manager { - #appConfig: AppConfig; - #driverConfig: DriverConfig; - #driver: ManagerDriver; - - router: Hono; - - public constructor(appConfig: AppConfig, driverConfig: DriverConfig) { - this.#appConfig = appConfig; - this.#driverConfig = driverConfig; - - if (!driverConfig.drivers?.manager) - throw new Error("config.drivers.manager is not defined."); - this.#driver = driverConfig.drivers.manager; - - this.router = this.#buildRouter(); - } - - #buildRouter() { - const app = new Hono(); - - // Apply CORS middleware if configured - if (this.#appConfig.cors) { - app.use("*", cors(this.#appConfig.cors)); - } - - app.get("/", (c) => { - return c.text( - "This is an ActorCore server.\n\nLearn more at https://actorcore.org", - ); - }); - - app.get("/health", (c) => { - return c.text("ok"); - }); - - app.route("/manager", this.#buildManagerRouter()); - - app.notFound(handleRouteNotFound); - app.onError(handleRouteError); - - return app; - } - - #buildManagerRouter(): Hono { - const managerApp = new Hono(); - - managerApp.post("/actors", async (c: HonoContext) => { - const { query } = ActorsRequestSchema.parse(await c.req.json()); - logger().debug("query", { query }); - - const url = new URL(c.req.url); - - // Determine base URL to build endpoints from - // - // This is used to build actor endpoints - let baseUrl = url.origin; - if (this.#appConfig.basePath) { - const basePath = this.#appConfig.basePath; - if (!basePath.startsWith("/")) - throw new Error("config.basePath must start with /"); - if (basePath.endsWith("/")) - throw new Error("config.basePath must not end with /"); - baseUrl += basePath; - } - - // Get the actor from the manager - let actorOutput: { endpoint: string }; - if ("getForId" in query) { - const output = await this.#driver.getForId({ - c, - baseUrl: baseUrl, - actorId: query.getForId.actorId, - }); - if (!output) - throw new Error( - `Actor does not exist for ID: ${query.getForId.actorId}`, - ); - actorOutput = output; - } else if ("getOrCreateForTags" in query) { - const existingActor = await this.#driver.getWithTags({ - c, - baseUrl: baseUrl, - name: query.getOrCreateForTags.name, - tags: query.getOrCreateForTags.tags - }); - if (existingActor) { - // Actor exists - actorOutput = existingActor; - } else { - if (query.getOrCreateForTags.create) { - // Create if needed - actorOutput = await this.#driver.createActor({ - c, - baseUrl: baseUrl, - ...query.getOrCreateForTags.create, - }); - } else { - // Creation disabled - throw new Error("Actor not found with tags or is private."); - } - } - } else if ("create" in query) { - actorOutput = await this.#driver.createActor({ - c, - baseUrl: baseUrl, - ...query.create, - }); - } else { - assertUnreachable(query); - } - - return c.json({ - endpoint: actorOutput.endpoint, - supportedTransports: ["websocket", "sse"], - }); - }); - - return managerApp; - } -} diff --git a/packages/actor-core/src/manager/mod.ts b/packages/actor-core/src/manager/mod.ts index 6480416a3..293b5264c 100644 --- a/packages/actor-core/src/manager/mod.ts +++ b/packages/actor-core/src/manager/mod.ts @@ -1,2 +1,2 @@ export { ManagerDriver } from "./driver"; -export { Manager } from "./manager"; +export { createManagerRouter } from "./router"; diff --git a/packages/actor-core/src/manager/router.ts b/packages/actor-core/src/manager/router.ts new file mode 100644 index 000000000..e5ba6221b --- /dev/null +++ b/packages/actor-core/src/manager/router.ts @@ -0,0 +1,133 @@ +import { ActorsRequestSchema } from "@/manager/protocol/mod"; +import { Hono, type Context as HonoContext } from "hono"; +import { cors } from "hono/cors"; +import { logger } from "./log"; +import { assertUnreachable } from "@/common/utils"; +import { handleRouteError, handleRouteNotFound } from "@/common/router"; +import type { DriverConfig } from "@/driver-helpers/config"; +import type { AppConfig } from "@/app/config"; +import { + createManagerInspectorRouter, + type ManagerInspectorConnHandler, +} from "@/inspector/manager"; +import type { UpgradeWebSocket } from "hono/ws"; + +type ManagerRouterHandler = { + onConnectInspector?: ManagerInspectorConnHandler; + upgradeWebSocket?: UpgradeWebSocket; +}; + +export function createManagerRouter( + appConfig: AppConfig, + driverConfig: DriverConfig, + handler: ManagerRouterHandler, +) { + if (!driverConfig.drivers?.manager) { + // FIXME move to config schema + throw new Error("config.drivers.manager is not defined."); + } + const driver = driverConfig.drivers.manager; + const app = new Hono(); + + // Apply CORS middleware if configured + if (appConfig.cors) { + app.use("*", cors(appConfig.cors)); + } + + app.get("/", (c) => { + return c.text( + "This is an ActorCore server.\n\nLearn more at https://actorcore.org", + ); + }); + + app.get("/health", (c) => { + return c.text("ok"); + }); + + app.post("/manager/actors", async (c: HonoContext) => { + const { query } = ActorsRequestSchema.parse(await c.req.json()); + logger().debug("query", { query }); + + const url = new URL(c.req.url); + + // Determine base URL to build endpoints from + // + // This is used to build actor endpoints + let baseUrl = url.origin; + if (appConfig.basePath) { + const basePath = appConfig.basePath; + if (!basePath.startsWith("/")) + throw new Error("config.basePath must start with /"); + if (basePath.endsWith("/")) + throw new Error("config.basePath must not end with /"); + baseUrl += basePath; + } + + // Get the actor from the manager + let actorOutput: { endpoint: string }; + if ("getForId" in query) { + const output = await driver.getForId({ + c, + baseUrl: baseUrl, + actorId: query.getForId.actorId, + }); + if (!output) + throw new Error( + `Actor does not exist for ID: ${query.getForId.actorId}`, + ); + actorOutput = output; + } else if ("getOrCreateForTags" in query) { + const existingActor = await driver.getWithTags({ + c, + baseUrl: baseUrl, + name: query.getOrCreateForTags.name, + tags: query.getOrCreateForTags.tags, + }); + if (existingActor) { + // Actor exists + actorOutput = existingActor; + } else { + if (query.getOrCreateForTags.create) { + // Create if needed + actorOutput = await driver.createActor({ + c, + baseUrl: baseUrl, + ...query.getOrCreateForTags.create, + }); + } else { + // Creation disabled + throw new Error("Actor not found with tags or is private."); + } + } + } else if ("create" in query) { + actorOutput = await driver.createActor({ + c, + baseUrl: baseUrl, + ...query.create, + }); + } else { + assertUnreachable(query); + } + + return c.json({ + endpoint: actorOutput.endpoint, + supportedTransports: ["websocket", "sse"], + }); + }); + + if (appConfig.inspector.enabled) { + app.route( + "/manager/inspect", + createManagerInspectorRouter( + handler.upgradeWebSocket, + handler.onConnectInspector, + appConfig.inspector, + ), + ); + } + + app.notFound(handleRouteNotFound); + app.onError(handleRouteError); + + return app; +} diff --git a/packages/actor-core/src/topologies/coordinate/topology.ts b/packages/actor-core/src/topologies/coordinate/topology.ts index ee9a66a62..5dab1fc81 100644 --- a/packages/actor-core/src/topologies/coordinate/topology.ts +++ b/packages/actor-core/src/topologies/coordinate/topology.ts @@ -6,12 +6,12 @@ import * as errors from "@/actor/errors"; import * as events from "node:events"; import { publishMessageToLeader } from "./node/message"; import type { RelayConn } from "./conn/mod"; -import type { Hono } from "hono"; +import { Hono } from "hono"; import { createActorRouter } from "@/actor/router"; -import { Manager } from "@/manager/manager"; import { handleRouteError, handleRouteNotFound } from "@/common/router"; -import { DriverConfig } from "@/driver-helpers/config"; -import { AppConfig } from "@/app/config"; +import type { DriverConfig } from "@/driver-helpers/config"; +import type { AppConfig } from "@/app/config"; +import { createManagerRouter } from "@/manager/router"; export interface GlobalState { nodeId: string; @@ -28,7 +28,8 @@ export class CoordinateTopology { constructor(appConfig: AppConfig, driverConfig: DriverConfig) { if (!driverConfig.drivers) throw new Error("config.drivers not defined."); - const { actor: actorDriver, coordinate: CoordinateDriver } = driverConfig.drivers; + const { actor: actorDriver, coordinate: CoordinateDriver } = + driverConfig.drivers; if (!actorDriver) throw new Error("config.drivers.actor not defined."); if (!CoordinateDriver) throw new Error("config.drivers.coordinate not defined."); @@ -47,14 +48,22 @@ export class CoordinateTopology { const node = new Node(CoordinateDriver, globalState); node.start(); - const manager = new Manager(appConfig, driverConfig); + // Build app + const app = new Hono(); - // Build router - const app = manager.router; + const upgradeWebSocket = driverConfig.getUpgradeWebSocket?.(app); + + // Build manager router + const managerRouter = createManagerRouter(appConfig, driverConfig, { + upgradeWebSocket, + onConnectInspector: () => { + throw new errors.Unsupported("inspect"); + }, + }); // Forward requests to actor const actorRouter = createActorRouter(appConfig, driverConfig, { - upgradeWebSocket: driverConfig.getUpgradeWebSocket?.(app), + upgradeWebSocket, onConnectWebSocket: async (opts) => { const actorId = opts.req.param("actorId"); if (!actorId) throw new errors.InternalError("Missing actor ID"); @@ -113,6 +122,7 @@ export class CoordinateTopology { }, }); + app.route("/", managerRouter); app.route("/actors/:actorId", actorRouter); app.notFound(handleRouteNotFound); diff --git a/packages/actor-core/src/topologies/partition/toplogy.ts b/packages/actor-core/src/topologies/partition/toplogy.ts index 9ae57bca7..6934e615a 100644 --- a/packages/actor-core/src/topologies/partition/toplogy.ts +++ b/packages/actor-core/src/topologies/partition/toplogy.ts @@ -1,11 +1,9 @@ -import { Manager } from "@/manager/manager"; import { Hono } from "hono"; import { createActorRouter } from "@/actor/router"; -import { AnyActorInstance } from "@/actor/instance"; +import type { AnyActorInstance } from "@/actor/instance"; import * as errors from "@/actor/errors"; import { - AnyConn, - Conn, + type AnyConn, generateConnId, generateConnToken, } from "@/actor/connection"; @@ -17,22 +15,52 @@ import { CONN_DRIVER_GENERIC_WEBSOCKET, createGenericConnDrivers, GenericConnGlobalState, - GenericHttpDriverState, - GenericSseDriverState, - GenericWebSocketDriverState, + type GenericHttpDriverState, + type GenericSseDriverState, + type GenericWebSocketDriverState, } from "../common/generic-conn-driver"; import type { ConnDriver } from "@/actor/driver"; import type { ActorTags } from "@/common/utils"; -import { InspectorConnection } from "@/actor/inspect"; -import { DriverConfig } from "@/driver-helpers/config"; -import { AppConfig } from "@/app/config"; +import type { DriverConfig } from "@/driver-helpers/config"; +import type { AppConfig } from "@/app/config"; +import type { ActorInspectorConnection } from "@/inspector/actor"; +import { createManagerRouter } from "@/manager/router"; +import type { ManagerInspectorConnection } from "@/inspector/manager"; export class PartitionTopologyManager { - router: Hono; + router = new Hono(); constructor(appConfig: AppConfig, driverConfig: DriverConfig) { - const manager = new Manager(appConfig, driverConfig); - this.router = manager.router; + this.router.route( + "/", + createManagerRouter(appConfig, driverConfig, { + upgradeWebSocket: driverConfig.getUpgradeWebSocket?.(this.router), + onConnectInspector: async () => { + const inspector = driverConfig.drivers?.manager?.inspector; + if (!inspector) throw new errors.Unsupported("inspector"); + + let conn: ManagerInspectorConnection | undefined; + return { + onOpen: async (ws) => { + conn = inspector.createConnection(ws); + }, + onMessage: async (message) => { + if (!conn) { + logger().warn("`conn` does not exist"); + return; + } + + inspector.processMessage(conn, message); + }, + onClose: async () => { + if (conn) { + inspector.removeConnection(conn); + } + }, + }; + }, + }), + ); } } @@ -70,11 +98,7 @@ export class PartitionTopologyActor { "/", createActorRouter(appConfig, driverConfig, { upgradeWebSocket: driverConfig.getUpgradeWebSocket?.(actorRouter), - onConnectWebSocket: async ({ - req, - encoding, - params: connParams, - }) => { + onConnectWebSocket: async ({ req, encoding, params: connParams }) => { if (this.#actorStartedPromise) await this.#actorStartedPromise.promise; @@ -83,10 +107,7 @@ export class PartitionTopologyActor { const connId = generateConnId(); const connToken = generateConnToken(); - const connState = await actor.prepareConn( - connParams, - req.raw, - ); + const connState = await actor.prepareConn(connParams, req.raw); let conn: AnyConn | undefined; return { @@ -133,10 +154,7 @@ export class PartitionTopologyActor { const connId = generateConnId(); const connToken = generateConnToken(); - const connState = await actor.prepareConn( - connParams, - req.raw, - ); + const connState = await actor.prepareConn(connParams, req.raw); let conn: AnyConn | undefined; return { @@ -174,10 +192,7 @@ export class PartitionTopologyActor { if (!actor) throw new Error("Actor should be defined"); // Create conn - const connState = await actor.prepareConn( - connParams, - req.raw, - ); + const connState = await actor.prepareConn(connParams, req.raw); conn = await actor.createConn( generateConnId(), generateConnToken(), @@ -227,10 +242,10 @@ export class PartitionTopologyActor { const actor = this.#actor; if (!actor) throw new Error("Actor should be defined"); - let conn: InspectorConnection | undefined; + let conn: ActorInspectorConnection | undefined; return { onOpen: async (ws) => { - conn = actor.inspector.__createConnection(ws); + conn = actor.inspector.createConnection(ws); }, onMessage: async (message) => { if (!conn) { @@ -238,11 +253,11 @@ export class PartitionTopologyActor { return; } - actor.inspector.__processMessage(conn, message); + actor.inspector.processMessage(conn, message); }, onClose: async () => { if (conn) { - actor.inspector.__removeConnection(conn); + actor.inspector.removeConnection(conn); } }, }; @@ -260,13 +275,21 @@ export class PartitionTopologyActor { // Find actor prototype const definition = this.#appConfig.actors[name]; // TODO: Handle error here gracefully somehow - if (!definition) throw new Error(`no actor in registry for name ${definition}`); + if (!definition) + throw new Error(`no actor in registry for name ${definition}`); // Create actor this.#actor = definition.instantiate(); // Start actor - await this.#actor.start(this.#connDrivers, actorDriver, id, name, tags, region); + await this.#actor.start( + this.#connDrivers, + actorDriver, + id, + name, + tags, + region, + ); this.#actorStartedPromise?.resolve(); this.#actorStartedPromise = undefined; diff --git a/packages/actor-core/src/topologies/standalone/topology.ts b/packages/actor-core/src/topologies/standalone/topology.ts index ccf0baccd..ae4799c45 100644 --- a/packages/actor-core/src/topologies/standalone/topology.ts +++ b/packages/actor-core/src/topologies/standalone/topology.ts @@ -1,13 +1,11 @@ import type { AnyActorInstance } from "@/actor/instance"; -import type { Hono } from "hono"; +import { Hono } from "hono"; import { - AnyConn, - type Conn, + type AnyConn, generateConnId, generateConnToken, } from "@/actor/connection"; import { createActorRouter } from "@/actor/router"; -import { Manager } from "@/manager/manager"; import { logger } from "./log"; import * as errors from "@/actor/errors"; import { @@ -21,8 +19,11 @@ import { type GenericWebSocketDriverState, } from "../common/generic-conn-driver"; import { ActionContext } from "@/actor/action"; -import { DriverConfig } from "@/driver-helpers/config"; -import { AppConfig } from "@/app/config"; +import type { DriverConfig } from "@/driver-helpers/config"; +import type { AppConfig } from "@/app/config"; +import { createManagerRouter } from "@/manager/router"; +import type { ActorInspectorConnection } from "@/inspector/actor"; +import type { ManagerInspectorConnection } from "@/inspector/manager"; class ActorHandler { /** Will be undefined if not yet loaded. */ @@ -55,7 +56,7 @@ export class StandaloneTopology { let handler = this.#actors.get(actorId); if (handler) { if (handler.actorPromise) await handler.actorPromise.promise; - if (!handler.actor) throw new Error("Acotr should be loaded"); + if (!handler.actor) throw new Error("Actor should be loaded"); return { handler, actor: handler.actor }; } @@ -118,15 +119,43 @@ export class StandaloneTopology { if (!driverConfig.drivers?.actor) throw new Error("config.drivers.actor not defined."); - // Create manager - const manager = new Manager(appConfig, driverConfig); - // Build router - const app = manager.router; + const app = new Hono(); + + const upgradeWebSocket = driverConfig.getUpgradeWebSocket?.(app); + + // Build manager router + const managerRouter = createManagerRouter(appConfig, driverConfig, { + upgradeWebSocket, + onConnectInspector: async () => { + const inspector = driverConfig.drivers?.manager?.inspector; + if (!inspector) throw new errors.Unsupported("inspector"); + + let conn: ManagerInspectorConnection | undefined; + return { + onOpen: async (ws) => { + conn = inspector.createConnection(ws); + }, + onMessage: async (message) => { + if (!conn) { + logger().warn("`conn` does not exist"); + return; + } + + inspector.processMessage(conn, message); + }, + onClose: async () => { + if (conn) { + inspector.removeConnection(conn); + } + }, + }; + }, + }); // Build actor router const actorRouter = createActorRouter(appConfig, driverConfig, { - upgradeWebSocket: driverConfig.getUpgradeWebSocket?.(app), + upgradeWebSocket, onConnectWebSocket: async ({ req, encoding, params: connParams }) => { const actorId = req.param("actorId"); if (!actorId) throw new errors.InternalError("Missing actor ID"); @@ -217,10 +246,7 @@ export class StandaloneTopology { const { actor } = await this.#getActor(actorId); // Create conn - const connState = await actor.prepareConn( - connParams, - req.raw, - ); + const connState = await actor.prepareConn(connParams, req.raw); conn = await actor.createConn( generateConnId(), generateConnToken(), @@ -262,11 +288,35 @@ export class StandaloneTopology { // Process message await actor.processMessage(message, conn); }, - onConnectInspector: async () => { - throw new errors.Unsupported("inspect"); + onConnectInspector: async ({ req }) => { + const actorId = req.param("actorId"); + if (!actorId) throw new errors.InternalError("Missing actor ID"); + + const { actor } = await this.#getActor(actorId); + + let conn: ActorInspectorConnection | undefined; + return { + onOpen: async (ws) => { + conn = actor.inspector.createConnection(ws); + }, + onMessage: async (message) => { + if (!conn) { + logger().warn("`conn` does not exist"); + return; + } + + actor.inspector.processMessage(conn, message); + }, + onClose: async () => { + if (conn) { + actor.inspector.removeConnection(conn); + } + }, + }; }, }); + app.route("/", managerRouter); // Mount the actor router app.route("/actors/:actorId", actorRouter); diff --git a/packages/create-actor/tsup.config.ts b/packages/create-actor/tsup.config.ts index 0119e705a..1856077ac 100644 --- a/packages/create-actor/tsup.config.ts +++ b/packages/create-actor/tsup.config.ts @@ -11,6 +11,7 @@ export default defineConfig({ shims: true, dts: false, sourcemap: true, + ignoreWatch: ["./tsup.config.bundled*"], esbuildPlugins: [ // @ts-ignore Macros(), diff --git a/packages/create-actor/turbo.json b/packages/create-actor/turbo.json index 95960709b..07b3ba3fb 100644 --- a/packages/create-actor/turbo.json +++ b/packages/create-actor/turbo.json @@ -1,4 +1,14 @@ { "$schema": "https://turbo.build/schema.json", - "extends": ["//"] + "extends": ["//"], + "tasks": { + "build": { + "inputs": ["src/**", "./tsup.config.ts"], + "outputs": ["dist/**"] + }, + "dev": { + "inputs": ["src/**", "./tsup.config.ts"], + "outputs": ["dist/**"] + } + } } diff --git a/packages/drivers/memory/package.json b/packages/drivers/memory/package.json index 9d9bd07f9..36b3c6853 100644 --- a/packages/drivers/memory/package.json +++ b/packages/drivers/memory/package.json @@ -23,7 +23,7 @@ "check-types": "tsc --noEmit" }, "peerDependencies": { - "actor-core": "workspace:*" + "actor-core": "*" }, "devDependencies": { "actor-core": "workspace:*", diff --git a/packages/drivers/memory/src/global_state.ts b/packages/drivers/memory/src/global_state.ts index f68763561..9cb91ea6a 100644 --- a/packages/drivers/memory/src/global_state.ts +++ b/packages/drivers/memory/src/global_state.ts @@ -50,7 +50,7 @@ export class MemoryGlobalState { * Put a value into KV store */ putKv(actorId: string, serializedKey: string, value: string): void { - let actor = this.#actors.get(actorId); + const actor = this.#actors.get(actorId); if (!actor) { throw new Error(`Actor does not exist for ID: ${actorId}`); } @@ -103,4 +103,11 @@ export class MemoryGlobalState { hasActor(actorId: string): boolean { return this.#actors.has(actorId); } + + /** + * Get all actors + */ + getAllActors(): ActorState[] { + return Array.from(this.#actors.values()); + } } diff --git a/packages/drivers/memory/src/manager.ts b/packages/drivers/memory/src/manager.ts index 6cfa10ab9..93ad125a1 100644 --- a/packages/drivers/memory/src/manager.ts +++ b/packages/drivers/memory/src/manager.ts @@ -7,11 +7,24 @@ import type { ManagerDriver, } from "actor-core/driver-helpers"; import type { MemoryGlobalState } from "./global_state"; +import { ManagerInspector } from "actor-core/inspector"; +import type { ActorCoreApp } from "actor-core"; export class MemoryManagerDriver implements ManagerDriver { #state: MemoryGlobalState; - constructor(state: MemoryGlobalState) { + /** + * @internal + */ + inspector: ManagerInspector = new ManagerInspector(this, { + getAllActors: () => this.#state.getAllActors(), + getAllTypesOfActors: () => Object.keys(this.app.config.actors), + }); + + constructor( + private readonly app: ActorCoreApp, + state: MemoryGlobalState, + ) { this.#state = state; } @@ -63,6 +76,9 @@ export class MemoryManagerDriver implements ManagerDriver { }: CreateActorInput): Promise { const actorId = crypto.randomUUID(); this.#state.createActor(actorId, name, tags); + + this.inspector.onActorsChange(this.#state.getAllActors()); + return { endpoint: buildActorEndpoint(baseUrl, actorId), }; diff --git a/packages/platforms/bun/package.json b/packages/platforms/bun/package.json index a59d8dbc4..ddbab65cb 100644 --- a/packages/platforms/bun/package.json +++ b/packages/platforms/bun/package.json @@ -26,8 +26,8 @@ "check-types": "tsc --noEmit" }, "peerDependencies": { - "@actor-core/memory": "workspace:*", - "actor-core": "workspace:*" + "@actor-core/memory": "*", + "actor-core": "*" }, "devDependencies": { "@actor-core/memory": "workspace:*", diff --git a/packages/platforms/bun/src/mod.ts b/packages/platforms/bun/src/mod.ts index 4dc658f2a..274094828 100644 --- a/packages/platforms/bun/src/mod.ts +++ b/packages/platforms/bun/src/mod.ts @@ -6,11 +6,18 @@ import { logger } from "./log"; import { createBunWebSocket } from "hono/bun"; import type { Hono } from "hono"; import { ActorCoreApp, StandaloneTopology } from "actor-core"; -import { MemoryGlobalState, MemoryManagerDriver, MemoryActorDriver } from "@actor-core/memory"; +import { + MemoryGlobalState, + MemoryManagerDriver, + MemoryActorDriver, +} from "@actor-core/memory"; export { InputConfig as Config } from "./config"; -export function createRouter(app: ActorCoreApp, inputConfig?: InputConfig): { +export function createRouter( + app: ActorCoreApp, + inputConfig?: InputConfig, +): { router: Hono; webSocketHandler: WebSocketHandler; } { @@ -30,7 +37,7 @@ export function createRouter(app: ActorCoreApp, inputConfig?: InputConfig): if (!config.drivers.manager || !config.drivers.actor) { const memoryState = new MemoryGlobalState(); if (!config.drivers.manager) { - config.drivers.manager = new MemoryManagerDriver(memoryState); + config.drivers.manager = new MemoryManagerDriver(app, memoryState); } if (!config.drivers.actor) { config.drivers.actor = new MemoryActorDriver(memoryState); @@ -44,14 +51,17 @@ export function createRouter(app: ActorCoreApp, inputConfig?: InputConfig): } else if (config.topology === "partition") { throw new Error("Bun only supports standalone & coordinate topology."); } else if (config.topology === "coordinate") { - const topology = new CoordinateTopology(app.config,config); + const topology = new CoordinateTopology(app.config, config); return { router: topology.router, webSocketHandler }; } else { assertUnreachable(config.topology); } } -export function createHandler(app: ActorCoreApp, inputConfig?: InputConfig): Serve { +export function createHandler( + app: ActorCoreApp, + inputConfig?: InputConfig, +): Serve { const config = ConfigSchema.parse(inputConfig); const { router, webSocketHandler } = createRouter(app, config); @@ -64,7 +74,10 @@ export function createHandler(app: ActorCoreApp, inputConfig?: InputConfig) }; } -export function serve(app: ActorCoreApp, inputConfig: InputConfig): Server { +export function serve( + app: ActorCoreApp, + inputConfig: InputConfig, +): Server { const config = ConfigSchema.parse(inputConfig); const handler = createHandler(app, config); diff --git a/packages/platforms/cloudflare-workers/package.json b/packages/platforms/cloudflare-workers/package.json index 5acf05719..d786913cc 100644 --- a/packages/platforms/cloudflare-workers/package.json +++ b/packages/platforms/cloudflare-workers/package.json @@ -26,7 +26,7 @@ "check-types": "tsc --noEmit" }, "peerDependencies": { - "actor-core": "workspace:*" + "actor-core": "*" }, "devDependencies": { "@cloudflare/workers-types": "^4.20250129.0", diff --git a/packages/platforms/nodejs/package.json b/packages/platforms/nodejs/package.json index e150dd406..7e811386f 100644 --- a/packages/platforms/nodejs/package.json +++ b/packages/platforms/nodejs/package.json @@ -26,7 +26,7 @@ "check-types": "tsc --noEmit" }, "peerDependencies": { - "actor-core": "workspace:*" + "actor-core": "*" }, "devDependencies": { "actor-core": "workspace:*", diff --git a/packages/platforms/nodejs/src/mod.ts b/packages/platforms/nodejs/src/mod.ts index 93439d921..444c36fc7 100644 --- a/packages/platforms/nodejs/src/mod.ts +++ b/packages/platforms/nodejs/src/mod.ts @@ -4,6 +4,7 @@ import { assertUnreachable } from "actor-core/utils"; import { CoordinateTopology } from "actor-core/topologies/coordinate"; import { logger } from "./log"; import type { Hono } from "hono"; +import { getRouterName, showRoutes } from "hono/dev"; import { StandaloneTopology, type ActorCoreApp } from "actor-core"; import { MemoryGlobalState, @@ -14,7 +15,10 @@ import { type InputConfig, ConfigSchema } from "./config"; export { InputConfig as Config } from "./config"; -export function createRouter(app: ActorCoreApp, inputConfig?: InputConfig): { +export function createRouter( + app: ActorCoreApp, + inputConfig?: InputConfig, +): { router: Hono; injectWebSocket: NodeWebSocket["injectWebSocket"]; } { @@ -25,7 +29,7 @@ export function createRouter(app: ActorCoreApp, inputConfig?: InputConfig): if (!config.drivers.manager || !config.drivers.actor) { const memoryState = new MemoryGlobalState(); if (!config.drivers.manager) { - config.drivers.manager = new MemoryManagerDriver(memoryState); + config.drivers.manager = new MemoryManagerDriver(app, memoryState); } if (!config.drivers.actor) { config.drivers.actor = new MemoryActorDriver(memoryState); @@ -60,7 +64,10 @@ export function createRouter(app: ActorCoreApp, inputConfig?: InputConfig): } } -export function serve(app: ActorCoreApp, inputConfig?: InputConfig): ServerType { +export function serve( + app: ActorCoreApp, + inputConfig?: InputConfig, +): ServerType { const config = ConfigSchema.parse(inputConfig); const { router, injectWebSocket } = createRouter(app, config); diff --git a/packages/platforms/rivet/package.json b/packages/platforms/rivet/package.json index 099e96c1d..cfbef1d17 100644 --- a/packages/platforms/rivet/package.json +++ b/packages/platforms/rivet/package.json @@ -26,7 +26,7 @@ "check-types": "tsc --noEmit" }, "peerDependencies": { - "actor-core": "workspace:*" + "actor-core": "*" }, "devDependencies": { "@rivet-gg/actor-core": "^25.1.0", diff --git a/packages/platforms/rivet/src/actor_handler.ts b/packages/platforms/rivet/src/actor_handler.ts index 5f82ebcd4..a4b996796 100644 --- a/packages/platforms/rivet/src/actor_handler.ts +++ b/packages/platforms/rivet/src/actor_handler.ts @@ -35,10 +35,16 @@ export function createActorHandler(inputConfig: InputConfig): RivetHandler { driverConfig.getUpgradeWebSocket = () => upgradeWebSocket; } + driverConfig.app.config.inspector = { + enabled: true, + // TODO: Add permission check + onRequest: async () => true, + }; + // Create actor topology driverConfig.topology = driverConfig.topology ?? "partition"; const actorTopology = new PartitionTopologyActor( - inputConfig.app.config, + driverConfig.app.config, driverConfig, ); diff --git a/packages/platforms/rivet/src/config.ts b/packages/platforms/rivet/src/config.ts index 8ed58f27b..8e3541ee8 100644 --- a/packages/platforms/rivet/src/config.ts +++ b/packages/platforms/rivet/src/config.ts @@ -3,7 +3,7 @@ import { DriverConfigSchema } from "actor-core/driver-helpers"; import { z } from "zod"; export const ConfigSchema = DriverConfigSchema.extend({ - app: z.custom>() + app: z.custom>(), }); export type InputConfig = z.input; export type Config = z.infer; diff --git a/packages/platforms/rivet/src/manager_handler.ts b/packages/platforms/rivet/src/manager_handler.ts index 2b98051d9..3ad735e38 100644 --- a/packages/platforms/rivet/src/manager_handler.ts +++ b/packages/platforms/rivet/src/manager_handler.ts @@ -35,6 +35,11 @@ export function createManagerHandler(inputConfig: InputConfig): RivetHandler { environment: ctx.metadata.environment.slug, }; + // Force disable inspector + driverConfig.app.config.inspector = { + enabled: false, + }; + // Setup manager driver if (!driverConfig.drivers) driverConfig.drivers = {}; if (!driverConfig.drivers.manager) { @@ -43,7 +48,10 @@ export function createManagerHandler(inputConfig: InputConfig): RivetHandler { // Create manager topology driverConfig.topology = driverConfig.topology ?? "partition"; - const managerTopology = new PartitionTopologyManager(driverConfig.app.config, driverConfig); + const managerTopology = new PartitionTopologyManager( + driverConfig.app.config, + driverConfig, + ); const app = managerTopology.router; diff --git a/yarn.lock b/yarn.lock index a3745409f..9d62d89a0 100644 --- a/yarn.lock +++ b/yarn.lock @@ -18,8 +18,8 @@ __metadata: typescript: "npm:^5.5.2" zod: "npm:^3.24.2" peerDependencies: - "@actor-core/memory": "workspace:*" - actor-core: "workspace:*" + "@actor-core/memory": "*" + actor-core: "*" languageName: unknown linkType: soft @@ -27,6 +27,7 @@ __metadata: version: 0.0.0-use.local resolution: "@actor-core/cli@workspace:packages/actor-core-cli" dependencies: + "@actor-core/nodejs": "workspace:^" "@inkjs/ui": "npm:^2.0.0" "@rivet-gg/api": "npm:^24.6.2" "@sentry/esbuild-plugin": "npm:^3.2.0" @@ -38,6 +39,7 @@ __metadata: "@types/which": "npm:^3.0.4" actor-core: "workspace:*" bundle-require: "npm:^5.1.0" + chokidar: "npm:^4.0.3" commander: "npm:^13.1.0" esbuild: "npm:^0.25.1" execa: "npm:^9.5.2" @@ -47,6 +49,7 @@ __metadata: ink-spinner: "npm:^5.0.0" joycon: "npm:^3.1.1" micromatch: "npm:^4.0.8" + open: "npm:^10.1.0" pkg-types: "npm:^2.0.0" react: "npm:^18.3" semver: "npm:^7.7.1" @@ -77,7 +80,7 @@ __metadata: wrangler: "npm:^3.101.0" zod: "npm:^3.24.2" peerDependencies: - actor-core: "workspace:*" + actor-core: "*" languageName: unknown linkType: soft @@ -104,11 +107,11 @@ __metadata: tsup: "npm:^8.4.0" typescript: "npm:^5.5.2" peerDependencies: - actor-core: "workspace:*" + actor-core: "*" languageName: unknown linkType: soft -"@actor-core/nodejs@workspace:packages/platforms/nodejs": +"@actor-core/nodejs@workspace:^, @actor-core/nodejs@workspace:packages/platforms/nodejs": version: 0.0.0-use.local resolution: "@actor-core/nodejs@workspace:packages/platforms/nodejs" dependencies: @@ -121,7 +124,7 @@ __metadata: typescript: "npm:^5.5.2" zod: "npm:^3.24.2" peerDependencies: - actor-core: "workspace:*" + actor-core: "*" languageName: unknown linkType: soft @@ -170,7 +173,7 @@ __metadata: typescript: "npm:^5.5.2" zod: "npm:^3.24.2" peerDependencies: - actor-core: "workspace:*" + actor-core: "*" languageName: unknown linkType: soft @@ -3863,6 +3866,15 @@ __metadata: languageName: node linkType: hard +"bundle-name@npm:^4.1.0": + version: 4.1.0 + resolution: "bundle-name@npm:4.1.0" + dependencies: + run-applescript: "npm:^7.0.0" + checksum: 10c0/8e575981e79c2bcf14d8b1c027a3775c095d362d1382312f444a7c861b0e21513c0bd8db5bd2b16e50ba0709fa622d4eab6b53192d222120305e68359daece29 + languageName: node + linkType: hard + "bundle-require@npm:^5.1.0": version: 5.1.0 resolution: "bundle-require@npm:5.1.0" @@ -4362,6 +4374,30 @@ __metadata: languageName: node linkType: hard +"default-browser-id@npm:^5.0.0": + version: 5.0.0 + resolution: "default-browser-id@npm:5.0.0" + checksum: 10c0/957fb886502594c8e645e812dfe93dba30ed82e8460d20ce39c53c5b0f3e2afb6ceaec2249083b90bdfbb4cb0f34e1f73fde3d68cac00becdbcfd894156b5ead + languageName: node + linkType: hard + +"default-browser@npm:^5.2.1": + version: 5.2.1 + resolution: "default-browser@npm:5.2.1" + dependencies: + bundle-name: "npm:^4.1.0" + default-browser-id: "npm:^5.0.0" + checksum: 10c0/73f17dc3c58026c55bb5538749597db31f9561c0193cd98604144b704a981c95a466f8ecc3c2db63d8bfd04fb0d426904834cfc91ae510c6aeb97e13c5167c4d + languageName: node + linkType: hard + +"define-lazy-prop@npm:^3.0.0": + version: 3.0.0 + resolution: "define-lazy-prop@npm:3.0.0" + checksum: 10c0/5ab0b2bf3fa58b3a443140bbd4cd3db1f91b985cc8a246d330b9ac3fc0b6a325a6d82bddc0b055123d745b3f9931afeea74a5ec545439a1630b9c8512b0eeb49 + languageName: node + linkType: hard + "defu@npm:^6.1.4": version: 6.1.4 resolution: "defu@npm:6.1.4" @@ -5662,6 +5698,15 @@ __metadata: languageName: node linkType: hard +"is-docker@npm:^3.0.0": + version: 3.0.0 + resolution: "is-docker@npm:3.0.0" + bin: + is-docker: cli.js + checksum: 10c0/d2c4f8e6d3e34df75a5defd44991b6068afad4835bb783b902fa12d13ebdb8f41b2a199dcb0b5ed2cb78bfee9e4c0bbdb69c2d9646f4106464674d3e697a5856 + languageName: node + linkType: hard + "is-extglob@npm:^2.1.1": version: 2.1.1 resolution: "is-extglob@npm:2.1.1" @@ -5710,6 +5755,17 @@ __metadata: languageName: node linkType: hard +"is-inside-container@npm:^1.0.0": + version: 1.0.0 + resolution: "is-inside-container@npm:1.0.0" + dependencies: + is-docker: "npm:^3.0.0" + bin: + is-inside-container: cli.js + checksum: 10c0/a8efb0e84f6197e6ff5c64c52890fa9acb49b7b74fed4da7c95383965da6f0fa592b4dbd5e38a79f87fc108196937acdbcd758fcefc9b140e479b39ce1fcd1cd + languageName: node + linkType: hard + "is-network-error@npm:^1.0.0": version: 1.1.0 resolution: "is-network-error@npm:1.1.0" @@ -5752,6 +5808,15 @@ __metadata: languageName: node linkType: hard +"is-wsl@npm:^3.1.0": + version: 3.1.0 + resolution: "is-wsl@npm:3.1.0" + dependencies: + is-inside-container: "npm:^1.0.0" + checksum: 10c0/d3317c11995690a32c362100225e22ba793678fe8732660c6de511ae71a0ff05b06980cf21f98a6bf40d7be0e9e9506f859abe00a1118287d63e53d0a3d06947 + languageName: node + linkType: hard + "isarray@npm:~1.0.0": version: 1.0.0 resolution: "isarray@npm:1.0.0" @@ -6541,6 +6606,18 @@ __metadata: languageName: node linkType: hard +"open@npm:^10.1.0": + version: 10.1.0 + resolution: "open@npm:10.1.0" + dependencies: + default-browser: "npm:^5.2.1" + define-lazy-prop: "npm:^3.0.0" + is-inside-container: "npm:^1.0.0" + is-wsl: "npm:^3.1.0" + checksum: 10c0/c86d0b94503d5f735f674158d5c5d339c25ec2927562f00ee74590727292ed23e1b8d9336cb41ffa7e1fa4d3641d29b199b4ea37c78cb557d72b511743e90ebb + languageName: node + linkType: hard + "p-limit@npm:^3.0.2": version: 3.1.0 resolution: "p-limit@npm:3.1.0" @@ -7431,6 +7508,13 @@ __metadata: languageName: node linkType: hard +"run-applescript@npm:^7.0.0": + version: 7.0.0 + resolution: "run-applescript@npm:7.0.0" + checksum: 10c0/bd821bbf154b8e6c8ecffeaf0c33cebbb78eb2987476c3f6b420d67ab4c5301faa905dec99ded76ebb3a7042b4e440189ae6d85bbbd3fc6e8d493347ecda8bfe + languageName: node + linkType: hard + "safe-buffer@npm:~5.1.0, safe-buffer@npm:~5.1.1": version: 5.1.2 resolution: "safe-buffer@npm:5.1.2" From f01ef1dd569eb355290e14c37abcd6f14ebde262 Mon Sep 17 00:00:00 2001 From: Kacper Wojciechowski <39823706+jog1t@users.noreply.github.com> Date: Fri, 4 Apr 2025 01:58:11 +0200 Subject: [PATCH 2/2] fix: inspector --- examples/chat-room/package.json | 6 +-- examples/counter/package.json | 6 +-- packages/actor-core-cli/src/commands/dev.tsx | 48 ++++++++++++++------ packages/actor-core-cli/src/server-entry.ts | 17 +++++++ packages/actor-core-cli/src/workflow.tsx | 16 ++++--- packages/actor-core-cli/tsup.config.ts | 2 +- packages/platforms/nodejs/src/mod.ts | 1 - 7 files changed, 68 insertions(+), 28 deletions(-) create mode 100644 packages/actor-core-cli/src/server-entry.ts diff --git a/examples/chat-room/package.json b/examples/chat-room/package.json index dd4ac11b8..0de4dcfa7 100644 --- a/examples/chat-room/package.json +++ b/examples/chat-room/package.json @@ -4,6 +4,7 @@ "private": true, "type": "module", "scripts": { + "dev": "npx @actor-core/cli@latest dev", "check-types": "tsc --noEmit", "test": "vitest run" }, @@ -11,15 +12,14 @@ "@types/node": "^22.13.9", "@types/prompts": "^2", "actor-core": "workspace:*", + "@actor-core/cli": "workspace:*", "prompts": "^2.4.2", "tsx": "^3.12.7", "typescript": "^5.5.2", "vitest": "^3.0.9" }, "example": { - "platforms": [ - "*" - ], + "platforms": ["*"], "actors": { "chat-room": "src/chat-room.ts" } diff --git a/examples/counter/package.json b/examples/counter/package.json index 19dd4f9e5..71e869032 100644 --- a/examples/counter/package.json +++ b/examples/counter/package.json @@ -4,19 +4,19 @@ "private": true, "type": "module", "scripts": { + "dev": "npx @actor-core/cli@latest dev", "check-types": "tsc --noEmit", "test": "vitest run" }, "devDependencies": { "@types/node": "^22.13.9", "actor-core": "workspace:*", + "@actor-core/cli": "workspace:*", "tsx": "^3.12.7", "typescript": "^5.7.3", "vitest": "^3.0.9" }, "example": { - "platforms": [ - "*" - ] + "platforms": ["*"] } } diff --git a/packages/actor-core-cli/src/commands/dev.tsx b/packages/actor-core-cli/src/commands/dev.tsx index b5ab3d7b0..e5a7ccc62 100644 --- a/packages/actor-core-cli/src/commands/dev.tsx +++ b/packages/actor-core-cli/src/commands/dev.tsx @@ -3,11 +3,11 @@ import { Argument, Command, Option } from "commander"; import { workflow } from "../workflow"; import { validateConfigTask } from "../workflows/validate-config"; -import { serve } from "@actor-core/nodejs"; import chokidar from "chokidar"; import { Text } from "ink"; import open from "open"; import { withResolvers } from "../utils/mod"; +import { spawn } from "node:child_process"; export const dev = new Command() .name("dev") @@ -35,8 +35,6 @@ export async function action( ) { const cwd = path.join(process.cwd(), cmdPath); await workflow("Run locally your ActorCore project", async function* (ctx) { - let server: ReturnType; - if (opts.open) { open( process.env._ACTOR_CORE_CLI_DEV @@ -51,26 +49,48 @@ export async function action( ignored: (path) => path.includes("node_modules"), }); - let lock: ReturnType = withResolvers(); + function createServer() { + return spawn( + process.execPath, + [ + path.join( + path.dirname(require.resolve("@actor-core/cli")), + "server-entry.js", + ), + ], + { env: { ...process.env, PORT: opts.port }, cwd }, + ); + } - watcher.on("all", async (event, path) => { - if (path.includes("node_modules") || path.includes("/.")) return; + let server: ReturnType | undefined = undefined; + let lock: ReturnType = withResolvers(); - server?.close(); + function createLock() { if (lock) { lock.resolve(undefined); - lock = withResolvers(); } + lock = withResolvers(); + } + + watcher.on("all", async (_, path) => { + if (path.includes("node_modules") || path.includes("/.")) return; + + server?.kill(); }); while (true) { - const config = yield* validateConfigTask(ctx, cwd); - config.app.config.inspector = { - enabled: true, - }; - server = serve(config.app, { - port: Number.parseInt(opts.port || "6420", 10) || 6420, + yield* validateConfigTask(ctx, cwd); + server = createServer(); + createLock(); + + server?.addListener("exit", () => { + lock.resolve(undefined); }); + + server?.addListener("close", () => { + lock.resolve(undefined); + }); + yield* ctx.task( "Watching for changes...", async () => { diff --git a/packages/actor-core-cli/src/server-entry.ts b/packages/actor-core-cli/src/server-entry.ts new file mode 100644 index 000000000..12b6c952a --- /dev/null +++ b/packages/actor-core-cli/src/server-entry.ts @@ -0,0 +1,17 @@ +import { validateConfig } from "./utils/config"; +import { serve } from "@actor-core/nodejs"; + +async function run() { + const config = await validateConfig(process.cwd()); + config.app.config.inspector = { + enabled: true, + }; + serve(config.app, { + port: Number.parseInt(process.env.PORT || "6420", 10) || 6420, + }); +} + +run().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/packages/actor-core-cli/src/workflow.tsx b/packages/actor-core-cli/src/workflow.tsx index cb49897b2..8695b8168 100644 --- a/packages/actor-core-cli/src/workflow.tsx +++ b/packages/actor-core-cli/src/workflow.tsx @@ -257,8 +257,7 @@ export function workflow( new Promise((resolve) => setTimeout(resolve, ms)), task: runner.bind(null, { ...meta, - parent: meta.id, - id: "", + parent: meta.parent, name: "", }) as Context["task"], render(children: React.ReactNode) { @@ -292,11 +291,12 @@ export function workflow( WorkflowAction.Prompt.One, WorkflowAction.Prompt.Answer > { + const id = getTaskId(); const { promise, resolve, reject } = withResolvers>(); yield WorkflowAction.prompt( - { ...meta, id: meta.id, name: question }, + { ...meta, id, name: question }, question, { answer: null, @@ -308,7 +308,7 @@ export function workflow( const result = await promise; yield WorkflowAction.prompt( - { ...meta, id: meta.id, name: question }, + { ...meta, id, name: question }, question, { answer: result, @@ -353,11 +353,15 @@ export function workflow( parentMap.set(task.meta.id, parent); // Propagate errors up the tree if (task.status === "error") { - let parentTask = parentMap.get(id); + let parentTask = parentMap.get(task.meta.id); while (parentTask) { const grandParent = parentMap.get(parentTask); yield WorkflowAction.progress( - { id, name: parentTask, parent: grandParent || null }, + { + id: parentTask, + name: parentTask, + parent: grandParent || null, + }, "error", ); parentTask = grandParent; diff --git a/packages/actor-core-cli/tsup.config.ts b/packages/actor-core-cli/tsup.config.ts index 669c4bdd9..9fbbc1cb4 100644 --- a/packages/actor-core-cli/tsup.config.ts +++ b/packages/actor-core-cli/tsup.config.ts @@ -11,7 +11,7 @@ const __dirname = topLevelFileURLToPath(new topLevelURL(".", import.meta.url)); `; export default defineConfig({ - entry: ["src/mod.ts", "src/cli.ts"], + entry: ["src/mod.ts", "src/cli.ts", "src/server-entry.ts"], platform: "node", bundle: true, format: "esm", diff --git a/packages/platforms/nodejs/src/mod.ts b/packages/platforms/nodejs/src/mod.ts index 444c36fc7..126dc9f5d 100644 --- a/packages/platforms/nodejs/src/mod.ts +++ b/packages/platforms/nodejs/src/mod.ts @@ -4,7 +4,6 @@ import { assertUnreachable } from "actor-core/utils"; import { CoordinateTopology } from "actor-core/topologies/coordinate"; import { logger } from "./log"; import type { Hono } from "hono"; -import { getRouterName, showRoutes } from "hono/dev"; import { StandaloneTopology, type ActorCoreApp } from "actor-core"; import { MemoryGlobalState,