@@ -12,10 +12,9 @@ import {
12
12
createManagerInspectorRouter ,
13
13
type ManagerInspectorConnHandler ,
14
14
} from "@/inspector/manager" ;
15
- import type { UpgradeWebSocket } from "hono/ws" ;
16
15
import { ConnectQuerySchema } from "./protocol/query" ;
17
16
import * as errors from "@/actor/errors" ;
18
- import type { ActorQuery , ConnectQuery } from "./protocol/query" ;
17
+ import type { ActorQuery } from "./protocol/query" ;
19
18
import { assertUnreachable } from "@/actor/utils" ;
20
19
import invariant from "invariant" ;
21
20
import {
@@ -27,12 +26,11 @@ import {
27
26
handleWebSocketConnect ,
28
27
} from "@/actor/router_endpoints" ;
29
28
import { ManagerDriver } from "./driver" ;
30
- import { setUncaughtExceptionCaptureCallback } from "process" ;
31
29
import { Encoding , serialize } from "@/actor/protocol/serde" ;
32
30
import { deconstructError } from "@/common/utils" ;
33
31
import { WSContext } from "hono/ws" ;
34
32
import { ToClient } from "@/actor/protocol/message/to-client" ;
35
- import { upgradeWebSocket } from "hono/deno " ;
33
+ import { streamSSE } from "hono/streaming " ;
36
34
37
35
type ProxyMode =
38
36
| {
@@ -190,7 +188,6 @@ export function createManagerRouter(
190
188
} ;
191
189
192
190
// Send the error message to the client
193
- invariant ( encoding , "encoding should be defined" ) ;
194
191
const serialized = serialize ( errorMsg , encoding ) ;
195
192
ws . send ( serialized ) ;
196
193
@@ -213,8 +210,11 @@ export function createManagerRouter(
213
210
214
211
// Proxy SSE connection to actor
215
212
app . get ( "/actors/connect/sse" , async ( c ) => {
216
- logger ( ) . debug ( "sse connection request received" ) ;
213
+ let encoding : Encoding | undefined ;
217
214
try {
215
+ encoding = getRequestEncoding ( c . req ) ;
216
+ logger ( ) . debug ( "sse connection request received" , { encoding } ) ;
217
+
218
218
const params = ConnectQuerySchema . safeParse ( {
219
219
query : parseQuery ( c ) ,
220
220
encoding : c . req . query ( "encoding" ) ,
@@ -264,14 +264,55 @@ export function createManagerRouter(
264
264
assertUnreachable ( handler . proxyMode ) ;
265
265
}
266
266
} catch ( error ) {
267
- logger ( ) . error ( "error setting up sse proxy" , { error } ) ;
267
+ // If we receive an error during setup, we send the error and close the socket immediately
268
+ //
269
+ // We have to return the error over SSE since SSE clients cannot read vanilla HTTP responses
268
270
269
- // Use ProxyError if it's not already an ActorError
270
- if ( ! ( error instanceof errors . ActorError ) ) {
271
- throw new errors . ProxyError ( "SSE connection" , error ) ;
272
- } else {
273
- throw error ;
274
- }
271
+ const { code, message, metadata } = deconstructError ( error , logger ( ) , {
272
+ sseEvent : "setup" ,
273
+ } ) ;
274
+
275
+ return streamSSE ( c , async ( stream ) => {
276
+ try {
277
+ if ( encoding ) {
278
+ // Serialize and send the connection error
279
+ const errorMsg : ToClient = {
280
+ b : {
281
+ ce : {
282
+ c : code ,
283
+ m : message ,
284
+ md : metadata ,
285
+ } ,
286
+ } ,
287
+ } ;
288
+
289
+ // Send the error message to the client
290
+ const serialized = serialize ( errorMsg , encoding ) ;
291
+ await stream . writeSSE ( {
292
+ data :
293
+ typeof serialized === "string"
294
+ ? serialized
295
+ : Buffer . from ( serialized ) . toString ( "base64" ) ,
296
+ } ) ;
297
+ } else {
298
+ // We don't know the encoding, send an error and close
299
+ await stream . writeSSE ( {
300
+ data : code ,
301
+ event : "error" ,
302
+ } ) ;
303
+ }
304
+ } catch ( serializeError ) {
305
+ logger ( ) . error ( "failed to send error to sse client" , {
306
+ error : serializeError ,
307
+ } ) ;
308
+ await stream . writeSSE ( {
309
+ data : "internal error during error handling" ,
310
+ event : "error" ,
311
+ } ) ;
312
+ }
313
+
314
+ // Stream will exit completely once function exits
315
+ } ) ;
275
316
}
276
317
} ) ;
277
318
0 commit comments