Skip to content

Commit 88c1b38

Browse files
committed
Ensure realtime subscription stops when runs are finished, and add an onComplete handle to use realtime hooks
1 parent ddd44a0 commit 88c1b38

File tree

7 files changed

+235
-88
lines changed

7 files changed

+235
-88
lines changed

.changeset/lemon-cherries-greet.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@trigger.dev/react-hooks": patch
3+
"@trigger.dev/sdk": patch
4+
---
5+
6+
Realtime streams now powered by electric. Also, this change fixes a realtime bug that was causing too many re-renders, even on records that didn't change

docs/frontend/react-hooks/realtime.mdx

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,31 @@ export function MyComponent({
6262
}
6363
```
6464

65+
You can supply an `onComplete` callback to the `useRealtimeRun` hook to be called when the run is completed or errored. This is useful if you want to perform some action when the run is completed, like navigating to a different page or showing a notification.
66+
67+
```tsx
68+
import { useRealtimeRun } from "@trigger.dev/react-hooks";
69+
70+
export function MyComponent({
71+
runId,
72+
publicAccessToken,
73+
}: {
74+
runId: string;
75+
publicAccessToken: string;
76+
}) {
77+
const { run, error } = useRealtimeRun(runId, {
78+
accessToken: publicAccessToken,
79+
onComplete: (run, error) => {
80+
console.log("Run completed", run);
81+
},
82+
});
83+
84+
if (error) return <div>Error: {error.message}</div>;
85+
86+
return <div>Run: {run.id}</div>;
87+
}
88+
```
89+
6590
See our [Realtime documentation](/realtime) for more information about the type of the run object and more.
6691

6792
### useRealtimeRunsWithTag

packages/core/src/v3/apiClient/runStream.ts

Lines changed: 111 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,12 @@ import {
1515
} from "../utils/ioSerialization.js";
1616
import { ApiError } from "./errors.js";
1717
import { ApiClient } from "./index.js";
18-
import { AsyncIterableStream, createAsyncIterableStream, zodShapeStream } from "./stream.js";
18+
import {
19+
AsyncIterableStream,
20+
createAsyncIterableReadable,
21+
createAsyncIterableStream,
22+
zodShapeStream,
23+
} from "./stream.js";
1924
import { EventSourceParserStream } from "eventsource-parser/stream";
2025

2126
export type RunShape<TRunTypes extends AnyRunTypes> = TRunTypes extends AnyRunTypes
@@ -82,25 +87,42 @@ export function runShapeStream<TRunTypes extends AnyRunTypes>(
8287
url: string,
8388
options?: RunShapeStreamOptions
8489
): RunSubscription<TRunTypes> {
90+
const abortController = new AbortController();
91+
8592
const version1 = new SSEStreamSubscriptionFactory(
8693
getEnvVar("TRIGGER_STREAM_URL", getEnvVar("TRIGGER_API_URL")) ?? "https://api.trigger.dev",
8794
{
8895
headers: options?.headers,
89-
signal: options?.signal,
96+
signal: abortController.signal,
9097
}
9198
);
9299

93100
const version2 = new ElectricStreamSubscriptionFactory(
94101
getEnvVar("TRIGGER_STREAM_URL", getEnvVar("TRIGGER_API_URL")) ?? "https://api.trigger.dev",
95102
{
96103
headers: options?.headers,
97-
signal: options?.signal,
104+
signal: abortController.signal,
98105
}
99106
);
100107

108+
// If the user supplied AbortSignal is aborted, we should abort the internal controller
109+
options?.signal?.addEventListener(
110+
"abort",
111+
() => {
112+
if (!abortController.signal.aborted) {
113+
abortController.abort();
114+
}
115+
},
116+
{ once: true }
117+
);
118+
101119
const $options: RunSubscriptionOptions = {
102-
runShapeStream: zodShapeStream(SubscribeRunRawShape, url, options),
120+
runShapeStream: zodShapeStream(SubscribeRunRawShape, url, {
121+
...options,
122+
signal: abortController.signal,
123+
}),
103124
streamFactory: new VersionedStreamSubscriptionFactory(version1, version2),
125+
abortController,
104126
...options,
105127
};
106128

@@ -218,11 +240,10 @@ export class ElectricStreamSubscriptionFactory implements StreamSubscriptionFact
218240
throw new Error("runId and streamKey are required");
219241
}
220242

221-
const url = `${baseUrl ?? this.baseUrl}/realtime/v2/streams/${runId}/${streamKey}`;
222-
223-
console.log("Creating ElectricStreamSubscription with URL:", url);
224-
225-
return new ElectricStreamSubscription(url, this.options);
243+
return new ElectricStreamSubscription(
244+
`${baseUrl ?? this.baseUrl}/realtime/v2/streams/${runId}/${streamKey}`,
245+
this.options
246+
);
226247
}
227248
}
228249

@@ -264,39 +285,48 @@ export interface RunShapeProvider {
264285
export type RunSubscriptionOptions = RunShapeStreamOptions & {
265286
runShapeStream: ReadableStream<SubscribeRunRawShape>;
266287
streamFactory: StreamSubscriptionFactory;
288+
abortController: AbortController;
267289
};
268290

269291
export class RunSubscription<TRunTypes extends AnyRunTypes> {
270-
private abortController: AbortController;
271292
private unsubscribeShape?: () => void;
272293
private stream: AsyncIterableStream<RunShape<TRunTypes>>;
273294
private packetCache = new Map<string, any>();
274295
private _closeOnComplete: boolean;
275296
private _isRunComplete = false;
276297

277298
constructor(private options: RunSubscriptionOptions) {
278-
this.abortController = new AbortController();
279299
this._closeOnComplete =
280300
typeof options.closeOnComplete === "undefined" ? true : options.closeOnComplete;
281301

282-
this.stream = createAsyncIterableStream(this.options.runShapeStream, {
283-
transform: async (chunk, controller) => {
284-
const run = await this.transformRunShape(chunk);
302+
this.stream = createAsyncIterableReadable(
303+
this.options.runShapeStream,
304+
{
305+
transform: async (chunk, controller) => {
306+
const run = await this.transformRunShape(chunk);
285307

286-
controller.enqueue(run);
308+
controller.enqueue(run);
287309

288-
this._isRunComplete = !!run.finishedAt;
310+
this._isRunComplete = !!run.finishedAt;
289311

290-
if (this._closeOnComplete && this._isRunComplete && !this.abortController.signal.aborted) {
291-
this.abortController.abort();
292-
}
312+
if (
313+
this._closeOnComplete &&
314+
this._isRunComplete &&
315+
!this.options.abortController.signal.aborted
316+
) {
317+
console.log("Closing stream because run is complete");
318+
319+
this.options.abortController.abort();
320+
}
321+
},
293322
},
294-
});
323+
this.options.abortController.signal
324+
);
295325
}
296326

297327
unsubscribe(): void {
298-
if (!this.abortController.signal.aborted) {
299-
this.abortController.abort();
328+
if (!this.options.abortController.signal.aborted) {
329+
this.options.abortController.abort();
300330
}
301331
this.unsubscribeShape?.();
302332
}
@@ -315,60 +345,68 @@ export class RunSubscription<TRunTypes extends AnyRunTypes> {
315345
// Keep track of which streams we've already subscribed to
316346
const activeStreams = new Set<string>();
317347

318-
return createAsyncIterableStream(this.stream, {
319-
transform: async (run, controller) => {
320-
controller.enqueue({
321-
type: "run",
322-
run,
323-
});
324-
325-
// Check for stream metadata
326-
if (run.metadata && "$$streams" in run.metadata && Array.isArray(run.metadata.$$streams)) {
327-
for (const streamKey of run.metadata.$$streams) {
328-
if (typeof streamKey !== "string") {
329-
continue;
330-
}
331-
332-
if (!activeStreams.has(streamKey)) {
333-
activeStreams.add(streamKey);
334-
335-
const subscription = this.options.streamFactory.createSubscription(
336-
run.metadata,
337-
run.id,
338-
streamKey,
339-
this.options.client?.baseUrl
340-
);
341-
342-
const stream = await subscription.subscribe();
343-
344-
// Create the pipeline and start it
345-
stream
346-
.pipeThrough(
347-
new TransformStream({
348-
transform(chunk, controller) {
349-
controller.enqueue({
350-
type: streamKey,
351-
chunk: chunk as TStreams[typeof streamKey],
352-
run,
353-
} as StreamPartResult<RunShape<TRunTypes>, TStreams>);
354-
},
355-
})
356-
)
357-
.pipeTo(
358-
new WritableStream({
359-
write(chunk) {
360-
controller.enqueue(chunk);
361-
},
362-
})
363-
)
364-
.catch((error) => {
365-
console.error(`Error in stream ${streamKey}:`, error);
366-
});
348+
return createAsyncIterableReadable(
349+
this.stream,
350+
{
351+
transform: async (run, controller) => {
352+
controller.enqueue({
353+
type: "run",
354+
run,
355+
});
356+
357+
// Check for stream metadata
358+
if (
359+
run.metadata &&
360+
"$$streams" in run.metadata &&
361+
Array.isArray(run.metadata.$$streams)
362+
) {
363+
for (const streamKey of run.metadata.$$streams) {
364+
if (typeof streamKey !== "string") {
365+
continue;
366+
}
367+
368+
if (!activeStreams.has(streamKey)) {
369+
activeStreams.add(streamKey);
370+
371+
const subscription = this.options.streamFactory.createSubscription(
372+
run.metadata,
373+
run.id,
374+
streamKey,
375+
this.options.client?.baseUrl
376+
);
377+
378+
const stream = await subscription.subscribe();
379+
380+
// Create the pipeline and start it
381+
stream
382+
.pipeThrough(
383+
new TransformStream({
384+
transform(chunk, controller) {
385+
controller.enqueue({
386+
type: streamKey,
387+
chunk: chunk as TStreams[typeof streamKey],
388+
run,
389+
} as StreamPartResult<RunShape<TRunTypes>, TStreams>);
390+
},
391+
})
392+
)
393+
.pipeTo(
394+
new WritableStream({
395+
write(chunk) {
396+
controller.enqueue(chunk);
397+
},
398+
})
399+
)
400+
.catch((error) => {
401+
console.error(`Error in stream ${streamKey}:`, error);
402+
});
403+
}
367404
}
368405
}
369-
}
406+
},
370407
},
371-
});
408+
this.options.abortController.signal
409+
);
372410
}
373411

374412
private async transformRunShape(row: SubscribeRunRawShape): Promise<RunShape<TRunTypes>> {

packages/core/src/v3/apiClient/stream.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,36 @@ export function createAsyncIterableStream<S, T>(
7070
return transformedStream;
7171
}
7272

73+
export function createAsyncIterableReadable<S, T>(
74+
source: ReadableStream<S>,
75+
transformer: Transformer<S, T>,
76+
signal: AbortSignal
77+
): AsyncIterableStream<T> {
78+
return new ReadableStream<T>({
79+
async start(controller) {
80+
const transformedStream = source.pipeThrough(new TransformStream(transformer));
81+
const reader = transformedStream.getReader();
82+
83+
signal.addEventListener("abort", () => {
84+
queueMicrotask(() => {
85+
reader.cancel();
86+
controller.close();
87+
});
88+
});
89+
90+
while (true) {
91+
const { done, value } = await reader.read();
92+
if (done) {
93+
controller.close();
94+
break;
95+
}
96+
97+
controller.enqueue(value);
98+
}
99+
},
100+
}) as AsyncIterableStream<T>;
101+
}
102+
73103
class ReadableShapeStream<T extends Row<unknown> = Row> {
74104
readonly #stream: ShapeStreamInterface<T>;
75105
readonly #currentState: Map<string, T> = new Map();

0 commit comments

Comments
 (0)