Skip to content

Commit bf929a5

Browse files
committed
feat: add actor proxy support for HTTP fallback
1 parent a75a0c8 commit bf929a5

File tree

6 files changed

+720
-85
lines changed

6 files changed

+720
-85
lines changed

packages/actor-core/src/actor/errors.ts

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,3 +194,55 @@ export class UserError extends ActorError {
194194
});
195195
}
196196
}
197+
198+
// Proxy-related errors
199+
200+
export class MissingRequiredParameters extends ActorError {
201+
constructor(missingParams: string[]) {
202+
super(
203+
"missing_required_parameters",
204+
`Missing required parameters: ${missingParams.join(", ")}`,
205+
{ public: true }
206+
);
207+
}
208+
}
209+
210+
export class InvalidQueryJSON extends ActorError {
211+
constructor(error?: unknown) {
212+
super(
213+
"invalid_query_json",
214+
`Invalid query JSON: ${error}`,
215+
{ public: true, cause: error }
216+
);
217+
}
218+
}
219+
220+
export class InvalidQueryFormat extends ActorError {
221+
constructor(error?: unknown) {
222+
super(
223+
"invalid_query_format",
224+
`Invalid query format: ${error}`,
225+
{ public: true, cause: error }
226+
);
227+
}
228+
}
229+
230+
export class ActorNotFound extends ActorError {
231+
constructor(identifier?: string) {
232+
super(
233+
"actor_not_found",
234+
identifier ? `Actor not found: ${identifier}` : "Actor not found",
235+
{ public: true }
236+
);
237+
}
238+
}
239+
240+
export class ProxyError extends ActorError {
241+
constructor(operation: string, error?: unknown) {
242+
super(
243+
"proxy_error",
244+
`Error proxying ${operation}: ${error}`,
245+
{ public: true, cause: error }
246+
);
247+
}
248+
}

packages/actor-core/src/client/actor_conn.ts

Lines changed: 142 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ export class ActorConnRaw {
9595
private readonly supportedTransports: Transport[],
9696
private readonly serverTransports: Transport[],
9797
private readonly dynamicImports: DynamicImports,
98+
private readonly actorQuery: unknown,
9899
) {
99100
this.#keepNodeAliveInterval = setInterval(() => 60_000);
100101
}
@@ -115,34 +116,135 @@ export class ActorConnRaw {
115116
): Promise<Response> {
116117
logger().debug("action", { name, args });
117118

118-
// TODO: Add to queue if socket is not open
119+
// Check if we have an active websocket connection
120+
if (this.#transport) {
121+
// If we have an active connection, use the websocket RPC
122+
const rpcId = this.#rpcIdCounter;
123+
this.#rpcIdCounter += 1;
119124

120-
const rpcId = this.#rpcIdCounter;
121-
this.#rpcIdCounter += 1;
125+
const { promise, resolve, reject } =
126+
Promise.withResolvers<wsToClient.RpcResponseOk>();
127+
this.#rpcInFlight.set(rpcId, { name, resolve, reject });
122128

123-
const { promise, resolve, reject } =
124-
Promise.withResolvers<wsToClient.RpcResponseOk>();
125-
this.#rpcInFlight.set(rpcId, { name, resolve, reject });
126-
127-
this.#sendMessage({
128-
b: {
129-
rr: {
130-
i: rpcId,
131-
n: name,
132-
a: args,
129+
this.#sendMessage({
130+
b: {
131+
rr: {
132+
i: rpcId,
133+
n: name,
134+
a: args,
135+
},
133136
},
134-
},
135-
} satisfies wsToServer.ToServer);
137+
} satisfies wsToServer.ToServer);
136138

137-
// TODO: Throw error if disconnect is called
139+
// TODO: Throw error if disconnect is called
138140

139-
const { i: responseId, o: output } = await promise;
140-
if (responseId !== rpcId)
141-
throw new Error(
142-
`Request ID ${rpcId} does not match response ID ${responseId}`,
143-
);
141+
const { i: responseId, o: output } = await promise;
142+
if (responseId !== rpcId)
143+
throw new Error(
144+
`Request ID ${rpcId} does not match response ID ${responseId}`,
145+
);
146+
147+
return output as Response;
148+
} else {
149+
// If no websocket connection, use HTTP RPC via manager
150+
try {
151+
// Get the manager endpoint from the endpoint provided
152+
const managerEndpoint = this.endpoint.split('/manager/')[0];
153+
const actorQueryStr = encodeURIComponent(JSON.stringify(this.actorQuery));
154+
155+
const url = `${managerEndpoint}/actor/rpc/${name}?query=${actorQueryStr}`;
156+
logger().debug("=== CLIENT HTTP RPC: Sending request ===", {
157+
url,
158+
managerEndpoint,
159+
actorQuery: this.actorQuery,
160+
name,
161+
args
162+
});
163+
164+
try {
165+
const response = await fetch(url, {
166+
method: "POST",
167+
headers: {
168+
"Content-Type": "application/json",
169+
},
170+
body: JSON.stringify({
171+
a: args,
172+
}),
173+
});
144174

145-
return output as Response;
175+
logger().debug("=== CLIENT HTTP RPC: Response received ===", {
176+
status: response.status,
177+
ok: response.ok,
178+
headers: Object.fromEntries([...response.headers])
179+
});
180+
181+
if (!response.ok) {
182+
try {
183+
const errorData = await response.json();
184+
logger().error("=== CLIENT HTTP RPC: Error response ===", { errorData });
185+
throw new errors.ActionError(
186+
errorData.c || "RPC_ERROR",
187+
errorData.m || "RPC call failed",
188+
errorData.md,
189+
);
190+
} catch (parseError) {
191+
// If response is not JSON, get it as text and throw generic error
192+
const errorText = await response.text();
193+
logger().error("=== CLIENT HTTP RPC: Error parsing response ===", {
194+
errorText,
195+
parseError
196+
});
197+
throw new errors.ActionError(
198+
"RPC_ERROR",
199+
`RPC call failed: ${errorText}`,
200+
{},
201+
);
202+
}
203+
}
204+
205+
// Clone response to avoid consuming it
206+
const responseClone = response.clone();
207+
const responseText = await responseClone.text();
208+
logger().debug("=== CLIENT HTTP RPC: Response body ===", { responseText });
209+
210+
// Parse response body
211+
try {
212+
const responseData = JSON.parse(responseText);
213+
logger().debug("=== CLIENT HTTP RPC: Parsed response ===", { responseData });
214+
return responseData.o as Response;
215+
} catch (parseError) {
216+
logger().error("=== CLIENT HTTP RPC: Error parsing JSON ===", {
217+
responseText,
218+
parseError
219+
});
220+
throw new errors.ActionError(
221+
"RPC_ERROR",
222+
`Failed to parse response: ${parseError}`,
223+
{ responseText }
224+
);
225+
}
226+
} catch (fetchError) {
227+
logger().error("=== CLIENT HTTP RPC: Fetch error ===", {
228+
error: fetchError,
229+
url
230+
});
231+
throw new errors.ActionError(
232+
"RPC_ERROR",
233+
`Fetch failed: ${fetchError}`,
234+
{ cause: fetchError }
235+
);
236+
}
237+
} catch (error) {
238+
if (error instanceof errors.ActionError) {
239+
throw error;
240+
}
241+
throw new errors.ActionError(
242+
"RPC_ERROR",
243+
`Failed to execute RPC ${name}: ${error}`,
244+
{ cause: error }
245+
);
246+
}
247+
}
146248
}
147249

148250
//async #rpcHttp<Args extends Array<unknown> = unknown[], Response = unknown>(name: string, ...args: Args): Promise<Response> {
@@ -453,7 +555,17 @@ enc
453555
}
454556

455557
#buildConnUrl(transport: Transport): string {
456-
let url = `${this.endpoint}/connect/${transport}?encoding=${this.encodingKind}`;
558+
// Get the manager endpoint from the endpoint provided
559+
const managerEndpoint = this.endpoint.split('/manager/')[0];
560+
const actorQueryStr = encodeURIComponent(JSON.stringify(this.actorQuery));
561+
562+
logger().debug("=== Client building conn URL ===", {
563+
originalEndpoint: this.endpoint,
564+
managerEndpoint: managerEndpoint,
565+
transport: transport
566+
});
567+
568+
let url = `${managerEndpoint}/actor/connect/${transport}?encoding=${this.encodingKind}&query=${actorQueryStr}`;
457569

458570
if (this.params !== undefined) {
459571
const paramsStr = JSON.stringify(this.params);
@@ -469,6 +581,8 @@ enc
469581
if (transport === "websocket") {
470582
url = url.replace(/^http:/, "ws:").replace(/^https:/, "wss:");
471583
}
584+
585+
logger().debug("=== Client final conn URL ===", { url });
472586

473587
return url;
474588
}
@@ -617,7 +731,11 @@ enc
617731
if (!this.#connectionId || !this.#connectionToken)
618732
throw new errors.InternalError("Missing connection ID or token.");
619733

620-
let url = `${this.endpoint}/connections/${this.#connectionId}/message?encoding=${this.encodingKind}&connectionToken=${encodeURIComponent(this.#connectionToken)}`;
734+
// Get the manager endpoint from the endpoint provided
735+
const managerEndpoint = this.endpoint.split('/manager/')[0];
736+
const actorQueryStr = encodeURIComponent(JSON.stringify(this.actorQuery));
737+
738+
let url = `${managerEndpoint}/actor/connections/${this.#connectionId}/message?encoding=${this.encodingKind}&connectionToken=${encodeURIComponent(this.#connectionToken)}&query=${actorQueryStr}`;
621739

622740
// TODO: Implement ordered messages, this is not guaranteed order. Needs to use an index in order to ensure we can pipeline requests efficiently.
623741
// TODO: Validate that we're using HTTP/3 whenever possible for pipelining requests

packages/actor-core/src/client/client.ts

Lines changed: 24 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -212,21 +212,18 @@ export class ClientRaw {
212212
params: opts?.params,
213213
});
214214

215-
const resJson = await this.#sendManagerRequest<
216-
ActorsRequest,
217-
ActorsResponse
218-
>("POST", "/manager/actors", {
219-
query: {
220-
getForId: {
221-
actorId,
222-
},
215+
const actorQuery = {
216+
getForId: {
217+
actorId,
223218
},
224-
});
219+
};
225220

221+
const managerEndpoint = await this.#managerEndpointPromise;
226222
const conn = await this.#createConn(
227-
resJson.endpoint,
223+
managerEndpoint,
228224
opts?.params,
229-
resJson.supportedTransports,
225+
["websocket", "sse"],
226+
actorQuery
230227
);
231228
return this.#createProxy(conn) as ActorConn<AD>;
232229
}
@@ -284,18 +281,18 @@ export class ClientRaw {
284281
create,
285282
});
286283

287-
let requestQuery;
284+
let actorQuery;
288285
if (opts?.noCreate) {
289286
// Use getForKey endpoint if noCreate is specified
290-
requestQuery = {
287+
actorQuery = {
291288
getForKey: {
292289
name,
293290
key: keyArray,
294291
},
295292
};
296293
} else {
297294
// Use getOrCreateForKey endpoint
298-
requestQuery = {
295+
actorQuery = {
299296
getOrCreateForKey: {
300297
name,
301298
key: keyArray,
@@ -304,17 +301,12 @@ export class ClientRaw {
304301
};
305302
}
306303

307-
const resJson = await this.#sendManagerRequest<
308-
ActorsRequest,
309-
ActorsResponse
310-
>("POST", "/manager/actors", {
311-
query: requestQuery,
312-
});
313-
304+
const managerEndpoint = await this.#managerEndpointPromise;
314305
const conn = await this.#createConn(
315-
resJson.endpoint,
306+
managerEndpoint,
316307
opts?.params,
317-
resJson.supportedTransports,
308+
["websocket", "sse"],
309+
actorQuery
318310
);
319311
return this.#createProxy(conn) as ActorConn<AD>;
320312
}
@@ -374,19 +366,16 @@ export class ClientRaw {
374366
create,
375367
});
376368

377-
const resJson = await this.#sendManagerRequest<
378-
ActorsRequest,
379-
ActorsResponse
380-
>("POST", "/manager/actors", {
381-
query: {
382-
create,
383-
},
384-
});
369+
const actorQuery = {
370+
create,
371+
};
385372

373+
const managerEndpoint = await this.#managerEndpointPromise;
386374
const conn = await this.#createConn(
387-
resJson.endpoint,
375+
managerEndpoint,
388376
opts?.params,
389-
resJson.supportedTransports,
377+
["websocket", "sse"],
378+
actorQuery
390379
);
391380
return this.#createProxy(conn) as ActorConn<AD>;
392381
}
@@ -395,6 +384,7 @@ export class ClientRaw {
395384
endpoint: string,
396385
params: unknown,
397386
serverTransports: Transport[],
387+
actorQuery: unknown,
398388
): Promise<ActorConnRaw> {
399389
const imports = await this.#dynamicImportsPromise;
400390

@@ -406,6 +396,7 @@ export class ClientRaw {
406396
this.#supportedTransports,
407397
serverTransports,
408398
imports,
399+
actorQuery,
409400
);
410401
this[ACTOR_CONNS_SYMBOL].add(conn);
411402
conn[CONNECT_SYMBOL]();

0 commit comments

Comments
 (0)