Skip to content

Commit 5a49f14

Browse files
committed
refactor: move redis p2p logic to generic driver
1 parent 8ce1fa1 commit 5a49f14

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+1238
-1084
lines changed

CLAUDE.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Actor-Core Development Guide
2+
3+
## Build Commands
4+
5+
- **Build:** `yarn build` - Production build
6+
- **Dev:** `yarn dev` - Watch mode for development
7+
- **Format:** `yarn fmt` - Format code with Biome
8+
- **Type Check:** `yarn check-types` - Verify TypeScript types
9+
10+
## Code Style Guidelines
11+
12+
- **Formatting:** Uses Biome for consistent formatting
13+
- **Imports:** Organized imports enforced, unused imports warned
14+
- **TypeScript:** Strict mode enabled, target ESNext
15+
- **Naming:**
16+
- camelCase for variables, functions
17+
- PascalCase for classes, interfaces, types
18+
- UPPER_CASE for constants
19+
- Use `#` prefix for private class members (not `private` keyword)
20+
- **Error Handling:**
21+
- Extend from `ActorError` base class
22+
- Use `UserError` for client-safe errors
23+
- Use `InternalError` for internal errors
24+
25+
## Project Structure
26+
27+
- Monorepo with Yarn workspaces and Turborepo
28+
- Core code in `packages/actor-core/`
29+
- Platform implementations in `packages/platforms/`
30+
- Driver implementations in `packages/drivers/`
31+
32+
## Development Notes
33+
34+
- Prefer classes over factory functions
35+
- Use zod for runtime type validation
36+
- Use `assertUnreachable(x: never)` for exhaustive type checking in switch statements
37+
- Follow existing patterns for P2P networking
38+
- Add proper JSDoc comments for public APIs
39+
- Ensure proper error handling with descriptive messages
40+
- Verify type safety with `yarn check-types` before committing
41+

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
"workspaces": [
66
"packages/*",
77
"packages/platforms/*",
8+
"packages/drivers/*",
89
"packages/misc/*",
910
"examples/*",
1011
"examples/*/platforms/*"

packages/actor-core/package.json

Lines changed: 26 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -41,91 +41,60 @@
4141
"default": "./dist/common/log.cjs"
4242
}
4343
},
44-
"./platform": {
45-
"import": {
46-
"types": "./dist/platform.d.ts",
47-
"default": "./dist/platform.js"
48-
},
49-
"require": {
50-
"types": "./dist/platform.d.cts",
51-
"default": "./dist/platform.cjs"
52-
}
53-
},
54-
"./actor/generic": {
55-
"import": {
56-
"types": "./dist/actor/generic/mod.d.ts",
57-
"default": "./dist/actor/generic/mod.js"
58-
},
59-
"require": {
60-
"types": "./dist/actor/generic/mod.d.cts",
61-
"default": "./dist/actor/generic/mod.cjs"
62-
}
63-
},
64-
"./actor/protocol/serde": {
65-
"import": {
66-
"types": "./dist/actor/protocol/serde.d.ts",
67-
"default": "./dist/actor/protocol/serde.js"
68-
},
69-
"require": {
70-
"types": "./dist/actor/protocol/serde.d.cts",
71-
"default": "./dist/actor/protocol/serde.cjs"
72-
}
73-
},
74-
"./actor/protocol/message": {
44+
"./actor/errors": {
7545
"import": {
76-
"types": "./dist/actor/protocol/message/mod.d.ts",
77-
"default": "./dist/actor/protocol/message/mod.js"
46+
"types": "./dist/actor/errors.d.ts",
47+
"default": "./dist/actor/errors.js"
7848
},
7949
"require": {
80-
"types": "./dist/actor/protocol/message/mod.d.cts",
81-
"default": "./dist/actor/protocol/message/mod.cjs"
50+
"types": "./dist/actor/errors.d.cts",
51+
"default": "./dist/actor/errors.cjs"
8252
}
8353
},
84-
"./actor/protocol/message/to_server": {
54+
"./utils": {
8555
"import": {
86-
"types": "./dist/actor/protocol/message/to_server.d.ts",
87-
"default": "./dist/actor/protocol/message/to_server.js"
56+
"types": "./dist/utils.d.ts",
57+
"default": "./dist/utils.js"
8858
},
8959
"require": {
90-
"types": "./dist/actor/protocol/message/to_server.d.cts",
91-
"default": "./dist/actor/protocol/message/to_server.cjs"
60+
"types": "./dist/utils.d.cts",
61+
"default": "./dist/utils.cjs"
9262
}
9363
},
94-
"./actor/protocol/message/to_client": {
64+
"./driver-helpers": {
9565
"import": {
96-
"types": "./dist/actor/protocol/message/to_client.d.ts",
97-
"default": "./dist/actor/protocol/message/to_client.js"
66+
"types": "./dist/driver-helpers.d.ts",
67+
"default": "./dist/driver-helpers.js"
9868
},
9969
"require": {
100-
"types": "./dist/actor/protocol/message/to_client.d.cts",
101-
"default": "./dist/actor/protocol/message/to_client.cjs"
70+
"types": "./dist/driver-helpers.d.cts",
71+
"default": "./dist/driver-helpers.cjs"
10272
}
10373
},
104-
"./actor/errors": {
74+
"./topologies/p2p": {
10575
"import": {
106-
"types": "./dist/actor/errors.d.ts",
107-
"default": "./dist/actor/errors.js"
76+
"types": "./dist/topologies/p2p/mod.d.ts",
77+
"default": "./dist/topologies/p2p/mod.js"
10878
},
10979
"require": {
110-
"types": "./dist/actor/errors.d.cts",
111-
"default": "./dist/actor/errors.cjs"
80+
"types": "./dist/topologies/p2p/mod.d.cts",
81+
"default": "./dist/topologies/p2p/mod.cjs"
11282
}
11383
},
114-
"./manager/protocol": {
84+
"./topologies/isolated": {
11585
"import": {
116-
"types": "./dist/manager/protocol/mod.d.ts",
117-
"default": "./dist/manager/protocol/mod.js"
86+
"types": "./dist/topologies/isolated/mod.d.ts",
87+
"default": "./dist/topologies/isolated/mod.js"
11888
},
11989
"require": {
120-
"types": "./dist/manager/protocol/mod.d.cts",
121-
"default": "./dist/manager/protocol/mod.cjs"
90+
"types": "./dist/topologies/isolated/mod.d.cts",
91+
"default": "./dist/topologies/isolated/mod.cjs"
12292
}
12393
}
12494
},
12595
"sideEffects": false,
12696
"scripts": {
127-
"build": "tsup src/mod.ts src/client/mod.ts src/common/log.ts src/platform.ts src/actor/generic/mod.ts src/actor/protocol/serde.ts src/actor/protocol/message/mod.ts src/actor/protocol/message/to_server.ts src/actor/protocol/message/to_client.ts src/actor/errors.ts src/manager/protocol/mod.ts",
128-
"dev": "yarn build --watch",
97+
"build": "tsup src/mod.ts src/client/mod.ts src/common/log.ts src/actor/errors.ts src/topologies/p2p/mod.ts src/topologies/isolated/mod.ts src/utils.ts src/driver-helpers.ts",
12998
"check-types": "tsc --noEmit"
13099
},
131100
"dependencies": {

packages/actor-core/src/actor/runtime/actor.ts

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ import type { WSContext, WSEvents } from "hono/ws";
1111
import onChange from "on-change";
1212
import { type ActorConfig, mergeActorConfig } from "./actor_config";
1313
import { Connection, type ConnectionId } from "./connection";
14-
import type { ActorDriver, ConnectionDriver } from "./driver";
14+
import type { ActorDriver, ConnectionDrivers } from "./driver";
15+
import type { ConnectionDriver } from "./driver";
1516
import * as errors from "../errors";
1617
import { parseMessage, processMessage } from "../protocol/message/mod";
1718
import { instanceLogger, logger } from "./log";
@@ -153,7 +154,8 @@ export abstract class Actor<
153154

154155
#backgroundPromises: Promise<void>[] = [];
155156
#config: ActorConfig;
156-
#driver!: ActorDriver;
157+
#connectionDrivers!: ConnectionDrivers;
158+
#actorDriver!: ActorDriver;
157159
#actorId!: string;
158160
#tags!: ActorTags;
159161
#region!: string;
@@ -186,16 +188,18 @@ export abstract class Actor<
186188
}
187189

188190
async __start(
189-
driver: ActorDriver,
191+
connectionDrivers: ConnectionDrivers,
192+
actorDriver: ActorDriver,
190193
actorId: string,
191194
tags: ActorTags,
192195
region: string,
193196
) {
194-
this.#driver = driver;
197+
this.#connectionDrivers = connectionDrivers;
198+
this.#actorDriver = actorDriver;
195199
this.#actorId = actorId;
196200
this.#tags = tags;
197201
this.#region = region;
198-
this.#schedule = new Schedule(this, driver);
202+
this.#schedule = new Schedule(this, actorDriver);
199203

200204
// Initialize server
201205
//
@@ -270,7 +274,7 @@ export abstract class Actor<
270274
this.#persistChanged = false;
271275

272276
// Write to KV
273-
await this.#driver.kvPut(KEYS.STATE.DATA, this.#persistRaw);
277+
await this.#actorDriver.kvPut(this.#actorId, KEYS.STATE.DATA, this.#persistRaw);
274278

275279
logger().debug("persist saved");
276280
});
@@ -339,7 +343,7 @@ export abstract class Actor<
339343
async #initialize() {
340344
// Read initial state
341345
const [[_i, initialized], [_s, persistData]] =
342-
(await this.#driver.kvGetBatch([
346+
(await this.#actorDriver.kvGetBatch(this.#actorId, [
343347
KEYS.STATE.INITIALIZED,
344348
KEYS.STATE.DATA,
345349
])) as [
@@ -400,7 +404,7 @@ export abstract class Actor<
400404

401405
// Update state
402406
logger().debug("writing state");
403-
await this.#driver.kvPutBatch([
407+
await this.#actorDriver.kvPutBatch(this.#actorId, [
404408
[KEYS.STATE.INITIALIZED, true],
405409
[KEYS.STATE.DATA, persist],
406410
]);
@@ -473,7 +477,7 @@ export abstract class Actor<
473477

474478
__getConnectionDriver(driverId: string): ConnectionDriver {
475479
// Get driver
476-
const driver = this.#driver.connectionDrivers[driverId];
480+
const driver = this.#connectionDrivers[driverId];
477481
if (!driver) throw new Error(`No connection driver: ${driverId}`);
478482
return driver;
479483
}

packages/actor-core/src/actor/runtime/actor_router.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ export interface ActorRouterHandler {
7777
export function createActorRouter(
7878
config: BaseConfig,
7979
handler: ActorRouterHandler,
80-
) {
80+
): Hono {
8181
const app = new Hono();
8282

8383
app.get("/", (c) => {
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,69 @@
1+
import { P2PDriver } from "@/driver-helpers";
2+
import type { AnyActorConstructor } from "./actor";
3+
import { ActorDriver, ManagerDriver } from "./driver";
4+
import type { Hono, Context as HonoContext, Handler as HonoHandler } from "hono";
5+
16
export const DEFAULT_ROUTER_MAX_CONNECTION_PARAMETER_SIZE = 8_192;
27
export const DEFAULT_ROUTER_MAX_INCOMING_MESSAGE_SIZE = 65_536;
38

9+
export const DEFAULT_ACTOR_PEER_LEASE_DURATION = 3000;
10+
export const DEFAULT_ACTOR_PEER_RENEW_LEASE_GRACE = 1500;
11+
export const DEFAULT_ACTOR_PEER_CHECK_LEASE_INTERVAL = 1000;
12+
export const DEFAULT_ACTOR_PEER_CHECK_LEASE_JITTER = 500;
13+
export const DEFAULT_ACTOR_PEER_MESSAGE_ACK_TIMEOUT = 1000;
14+
15+
export type Topology = "single" | "isolated" | "p2p";
16+
417
/** Base config used for the actor config across all platforms. */
518
export interface BaseConfig {
19+
topology: Topology;
20+
actors: Record<string, AnyActorConstructor>;
21+
drivers: {
22+
manager: ManagerDriver;
23+
actor: ActorDriver;
24+
p2p?: P2PDriver;
25+
};
626
router?: {
27+
// This is dynamic since NodeJS requires a reference to the app to initialize WebSockets
28+
getUpgradeWebSocket?: (
29+
app: Hono,
30+
) => (createEvents: (c: HonoContext) => any) => HonoHandler;
31+
732
/** This goes in the URL so it needs to be short. */
833
maxConnectionParametersSize?: number;
934

1035
maxIncomingMessageSize?: number;
1136
};
37+
actorPeer?: {
38+
/**
39+
* How long the actor leader holds a lease for.
40+
*
41+
* Milliseconds
42+
**/
43+
leaseDuration?: number;
44+
/**
45+
* How long before the lease will expire to issue the renew command.
46+
*
47+
* Milliseconds
48+
*/
49+
renewLeaseGrace?: number;
50+
/**
51+
* How frequently the followers check if the leader is still active.
52+
*
53+
* Milliseconds
54+
*/
55+
checkLeaseInterval?: number;
56+
/**
57+
* Positive jitter for check lease interval
58+
*
59+
* Milliseconds
60+
*/
61+
checkLeaseJitter?: number;
62+
/**
63+
* How long to wait for a message ack.
64+
*
65+
* Milliseconds
66+
*/
67+
messageAckTimeout?: number;
68+
};
1269
}

packages/actor-core/src/actor/runtime/driver.ts

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,52 @@
1-
import { CachedSerializer } from "../protocol/serde";
1+
import type { ActorTags, Connection } from "./mod";
22
import type * as messageToClient from "@/actor/protocol/message/to_client";
3-
import { AnyActor } from "./actor";
4-
import { Connection } from "./connection";
5-
6-
export interface LoadOutput {
7-
actor: {
8-
id: string;
9-
tags: Record<string, string>;
10-
createdAt: Date;
11-
};
12-
region: string;
3+
import type { CachedSerializer } from "@/actor/protocol/serde";
4+
import type { AnyActor } from "./actor";
5+
6+
export type ConnectionDrivers = Record<string, ConnectionDriver>;
7+
8+
export interface GetForIdInput {
9+
origin: string;
10+
actorId: string;
1311
}
1412

15-
export interface ActorDriver {
16-
connectionDrivers: Record<string, ConnectionDriver>;
13+
export interface GetWithTagsInput {
14+
origin: string;
15+
tags: ActorTags;
16+
}
17+
18+
export interface CreateActorInput {
19+
origin: string;
20+
region?: string;
21+
tags: ActorTags;
22+
}
1723

24+
export interface GetActorOutput {
25+
endpoint: string;
26+
}
27+
28+
export interface ManagerDriver {
29+
getForId(input: GetForIdInput): Promise<GetActorOutput>;
30+
getWithTags(input: GetWithTagsInput): Promise<GetActorOutput | undefined>;
31+
createActor(input: CreateActorInput): Promise<GetActorOutput>;
32+
}
33+
34+
export type KvKey = unknown[];
35+
export type KvValue = unknown;
36+
37+
export interface ActorDriver {
1838
//load(): Promise<LoadOutput>;
1939

2040
// HACK: Clean these up
21-
kvGet(key: any): Promise<any>;
22-
kvGetBatch(key: any[]): Promise<[any, any][]>;
23-
kvPut(key: any, value: any): Promise<void>;
24-
kvPutBatch(key: [any, any][]): Promise<void>;
25-
kvDelete(key: any): Promise<void>;
26-
kvDeleteBatch(key: any[]): Promise<void>;
41+
kvGet(actorId: string, key: KvKey): Promise<KvValue | undefined>;
42+
kvGetBatch(actorId: string, key: KvKey[]): Promise<(KvValue | undefined)[]>;
43+
kvPut(actorId: string, key: KvKey, value: KvValue): Promise<void>;
44+
kvPutBatch(actorId: string, key: [KvKey, KvValue][]): Promise<void>;
45+
kvDelete(actorId: string, key: KvKey): Promise<void>;
46+
kvDeleteBatch(actorId: string, key: KvKey[]): Promise<void>;
2747

2848
// Schedule
29-
setAlarm(timestamp: number): Promise<void>;
49+
setAlarm(actorId: string, timestamp: number): Promise<void>;
3050

3151
// TODO:
3252
//destroy(): Promise<void>;

0 commit comments

Comments
 (0)