@@ -15,7 +15,12 @@ import {
15
15
} from "../utils/ioSerialization.js" ;
16
16
import { ApiError } from "./errors.js" ;
17
17
import { ApiClient } from "./index.js" ;
18
- import { AsyncIterableStream , createAsyncIterableStream , zodShapeStream } from "./stream.js" ;
18
+ import {
19
+ AsyncIterableStream ,
20
+ createAsyncIterableReadable ,
21
+ createAsyncIterableStream ,
22
+ zodShapeStream ,
23
+ } from "./stream.js" ;
19
24
import { EventSourceParserStream } from "eventsource-parser/stream" ;
20
25
21
26
export type RunShape < TRunTypes extends AnyRunTypes > = TRunTypes extends AnyRunTypes
@@ -82,25 +87,42 @@ export function runShapeStream<TRunTypes extends AnyRunTypes>(
82
87
url : string ,
83
88
options ?: RunShapeStreamOptions
84
89
) : RunSubscription < TRunTypes > {
90
+ const abortController = new AbortController ( ) ;
91
+
85
92
const version1 = new SSEStreamSubscriptionFactory (
86
93
getEnvVar ( "TRIGGER_STREAM_URL" , getEnvVar ( "TRIGGER_API_URL" ) ) ?? "https://api.trigger.dev" ,
87
94
{
88
95
headers : options ?. headers ,
89
- signal : options ? .signal ,
96
+ signal : abortController . signal ,
90
97
}
91
98
) ;
92
99
93
100
const version2 = new ElectricStreamSubscriptionFactory (
94
101
getEnvVar ( "TRIGGER_STREAM_URL" , getEnvVar ( "TRIGGER_API_URL" ) ) ?? "https://api.trigger.dev" ,
95
102
{
96
103
headers : options ?. headers ,
97
- signal : options ? .signal ,
104
+ signal : abortController . signal ,
98
105
}
99
106
) ;
100
107
108
+ // If the user supplied AbortSignal is aborted, we should abort the internal controller
109
+ options ?. signal ?. addEventListener (
110
+ "abort" ,
111
+ ( ) => {
112
+ if ( ! abortController . signal . aborted ) {
113
+ abortController . abort ( ) ;
114
+ }
115
+ } ,
116
+ { once : true }
117
+ ) ;
118
+
101
119
const $options : RunSubscriptionOptions = {
102
- runShapeStream : zodShapeStream ( SubscribeRunRawShape , url , options ) ,
120
+ runShapeStream : zodShapeStream ( SubscribeRunRawShape , url , {
121
+ ...options ,
122
+ signal : abortController . signal ,
123
+ } ) ,
103
124
streamFactory : new VersionedStreamSubscriptionFactory ( version1 , version2 ) ,
125
+ abortController,
104
126
...options ,
105
127
} ;
106
128
@@ -218,11 +240,10 @@ export class ElectricStreamSubscriptionFactory implements StreamSubscriptionFact
218
240
throw new Error ( "runId and streamKey are required" ) ;
219
241
}
220
242
221
- const url = `${ baseUrl ?? this . baseUrl } /realtime/v2/streams/${ runId } /${ streamKey } ` ;
222
-
223
- console . log ( "Creating ElectricStreamSubscription with URL:" , url ) ;
224
-
225
- return new ElectricStreamSubscription ( url , this . options ) ;
243
+ return new ElectricStreamSubscription (
244
+ `${ baseUrl ?? this . baseUrl } /realtime/v2/streams/${ runId } /${ streamKey } ` ,
245
+ this . options
246
+ ) ;
226
247
}
227
248
}
228
249
@@ -264,39 +285,48 @@ export interface RunShapeProvider {
264
285
export type RunSubscriptionOptions = RunShapeStreamOptions & {
265
286
runShapeStream : ReadableStream < SubscribeRunRawShape > ;
266
287
streamFactory : StreamSubscriptionFactory ;
288
+ abortController : AbortController ;
267
289
} ;
268
290
269
291
export class RunSubscription < TRunTypes extends AnyRunTypes > {
270
- private abortController : AbortController ;
271
292
private unsubscribeShape ?: ( ) => void ;
272
293
private stream : AsyncIterableStream < RunShape < TRunTypes > > ;
273
294
private packetCache = new Map < string , any > ( ) ;
274
295
private _closeOnComplete : boolean ;
275
296
private _isRunComplete = false ;
276
297
277
298
constructor ( private options : RunSubscriptionOptions ) {
278
- this . abortController = new AbortController ( ) ;
279
299
this . _closeOnComplete =
280
300
typeof options . closeOnComplete === "undefined" ? true : options . closeOnComplete ;
281
301
282
- this . stream = createAsyncIterableStream ( this . options . runShapeStream , {
283
- transform : async ( chunk , controller ) => {
284
- const run = await this . transformRunShape ( chunk ) ;
302
+ this . stream = createAsyncIterableReadable (
303
+ this . options . runShapeStream ,
304
+ {
305
+ transform : async ( chunk , controller ) => {
306
+ const run = await this . transformRunShape ( chunk ) ;
285
307
286
- controller . enqueue ( run ) ;
308
+ controller . enqueue ( run ) ;
287
309
288
- this . _isRunComplete = ! ! run . finishedAt ;
310
+ this . _isRunComplete = ! ! run . finishedAt ;
289
311
290
- if ( this . _closeOnComplete && this . _isRunComplete && ! this . abortController . signal . aborted ) {
291
- this . abortController . abort ( ) ;
292
- }
312
+ if (
313
+ this . _closeOnComplete &&
314
+ this . _isRunComplete &&
315
+ ! this . options . abortController . signal . aborted
316
+ ) {
317
+ console . log ( "Closing stream because run is complete" ) ;
318
+
319
+ this . options . abortController . abort ( ) ;
320
+ }
321
+ } ,
293
322
} ,
294
- } ) ;
323
+ this . options . abortController . signal
324
+ ) ;
295
325
}
296
326
297
327
unsubscribe ( ) : void {
298
- if ( ! this . abortController . signal . aborted ) {
299
- this . abortController . abort ( ) ;
328
+ if ( ! this . options . abortController . signal . aborted ) {
329
+ this . options . abortController . abort ( ) ;
300
330
}
301
331
this . unsubscribeShape ?.( ) ;
302
332
}
@@ -315,60 +345,68 @@ export class RunSubscription<TRunTypes extends AnyRunTypes> {
315
345
// Keep track of which streams we've already subscribed to
316
346
const activeStreams = new Set < string > ( ) ;
317
347
318
- return createAsyncIterableStream ( this . stream , {
319
- transform : async ( run , controller ) => {
320
- controller . enqueue ( {
321
- type : "run" ,
322
- run,
323
- } ) ;
324
-
325
- // Check for stream metadata
326
- if ( run . metadata && "$$streams" in run . metadata && Array . isArray ( run . metadata . $$streams ) ) {
327
- for ( const streamKey of run . metadata . $$streams ) {
328
- if ( typeof streamKey !== "string" ) {
329
- continue ;
330
- }
331
-
332
- if ( ! activeStreams . has ( streamKey ) ) {
333
- activeStreams . add ( streamKey ) ;
334
-
335
- const subscription = this . options . streamFactory . createSubscription (
336
- run . metadata ,
337
- run . id ,
338
- streamKey ,
339
- this . options . client ?. baseUrl
340
- ) ;
341
-
342
- const stream = await subscription . subscribe ( ) ;
343
-
344
- // Create the pipeline and start it
345
- stream
346
- . pipeThrough (
347
- new TransformStream ( {
348
- transform ( chunk , controller ) {
349
- controller . enqueue ( {
350
- type : streamKey ,
351
- chunk : chunk as TStreams [ typeof streamKey ] ,
352
- run,
353
- } as StreamPartResult < RunShape < TRunTypes > , TStreams > ) ;
354
- } ,
355
- } )
356
- )
357
- . pipeTo (
358
- new WritableStream ( {
359
- write ( chunk ) {
360
- controller . enqueue ( chunk ) ;
361
- } ,
362
- } )
363
- )
364
- . catch ( ( error ) => {
365
- console . error ( `Error in stream ${ streamKey } :` , error ) ;
366
- } ) ;
348
+ return createAsyncIterableReadable (
349
+ this . stream ,
350
+ {
351
+ transform : async ( run , controller ) => {
352
+ controller . enqueue ( {
353
+ type : "run" ,
354
+ run,
355
+ } ) ;
356
+
357
+ // Check for stream metadata
358
+ if (
359
+ run . metadata &&
360
+ "$$streams" in run . metadata &&
361
+ Array . isArray ( run . metadata . $$streams )
362
+ ) {
363
+ for ( const streamKey of run . metadata . $$streams ) {
364
+ if ( typeof streamKey !== "string" ) {
365
+ continue ;
366
+ }
367
+
368
+ if ( ! activeStreams . has ( streamKey ) ) {
369
+ activeStreams . add ( streamKey ) ;
370
+
371
+ const subscription = this . options . streamFactory . createSubscription (
372
+ run . metadata ,
373
+ run . id ,
374
+ streamKey ,
375
+ this . options . client ?. baseUrl
376
+ ) ;
377
+
378
+ const stream = await subscription . subscribe ( ) ;
379
+
380
+ // Create the pipeline and start it
381
+ stream
382
+ . pipeThrough (
383
+ new TransformStream ( {
384
+ transform ( chunk , controller ) {
385
+ controller . enqueue ( {
386
+ type : streamKey ,
387
+ chunk : chunk as TStreams [ typeof streamKey ] ,
388
+ run,
389
+ } as StreamPartResult < RunShape < TRunTypes > , TStreams > ) ;
390
+ } ,
391
+ } )
392
+ )
393
+ . pipeTo (
394
+ new WritableStream ( {
395
+ write ( chunk ) {
396
+ controller . enqueue ( chunk ) ;
397
+ } ,
398
+ } )
399
+ )
400
+ . catch ( ( error ) => {
401
+ console . error ( `Error in stream ${ streamKey } :` , error ) ;
402
+ } ) ;
403
+ }
367
404
}
368
405
}
369
- }
406
+ } ,
370
407
} ,
371
- } ) ;
408
+ this . options . abortController . signal
409
+ ) ;
372
410
}
373
411
374
412
private async transformRunShape ( row : SubscribeRunRawShape ) : Promise < RunShape < TRunTypes > > {
0 commit comments