Skip to content

Commit ab432a2

Browse files
authored
Fix realtime re-subscribing stale data issue (#2162)
* Fix realtime re-subscribing stale data issue Fixes an issue with realtime when re-subscribing to a run, that would temporarily display stale data and the changes. Now when re-subscribing to a run only the latest changes will be vended * removed logs
1 parent e12c82b commit ab432a2

File tree

3 files changed

+42
-2
lines changed

3 files changed

+42
-2
lines changed

.changeset/wicked-ads-walk.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@trigger.dev/react-hooks": patch
3+
"@trigger.dev/core": patch
4+
---
5+
6+
Fixes an issue with realtime when re-subscribing to a run, that would temporarily display stale data and the changes. Now when re-subscribing to a run only the latest changes will be vended

packages/core/src/v3/apiClient/stream.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,8 @@ class ReadableShapeStream<T extends Row<unknown> = Row> {
114114
},
115115
});
116116

117+
let updatedKeys = new Set<string>();
118+
117119
// Create the transformed stream that processes messages and emits complete rows
118120
this.#changeStream = createAsyncIterableStream(source, {
119121
transform: (messages, controller) => {
@@ -122,7 +124,7 @@ class ReadableShapeStream<T extends Row<unknown> = Row> {
122124
}
123125

124126
try {
125-
const updatedKeys = new Set<string>();
127+
let isUpToDate = false;
126128

127129
for (const message of messages) {
128130
if (isChangeMessage(message)) {
@@ -147,18 +149,23 @@ class ReadableShapeStream<T extends Row<unknown> = Row> {
147149
if (message.headers.control === "must-refetch") {
148150
this.#currentState.clear();
149151
this.#error = false;
152+
} else if (message.headers.control === "up-to-date") {
153+
isUpToDate = true;
150154
}
151155
}
152156
}
153157

154158
// Now enqueue only one updated row per key, after all messages have been processed.
155-
if (!this.#isStreamClosed) {
159+
// If the stream is not up to date, we don't want to enqueue any rows.
160+
if (!this.#isStreamClosed && isUpToDate) {
156161
for (const key of updatedKeys) {
157162
const finalRow = this.#currentState.get(key);
158163
if (finalRow) {
159164
controller.enqueue(finalRow);
160165
}
161166
}
167+
168+
updatedKeys.clear();
162169
}
163170
} catch (error) {
164171
console.error("Error processing stream messages:", error);

references/hello-world/src/trigger/realtime.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { logger, runs, task } from "@trigger.dev/sdk";
22
import { helloWorldTask } from "./example.js";
3+
import { setTimeout } from "timers/promises";
34

45
export const realtimeByTagsTask = task({
56
id: "realtime-by-tags",
@@ -32,3 +33,29 @@ export const realtimeByTagsTask = task({
3233
};
3334
},
3435
});
36+
37+
export const realtimeUpToDateTask = task({
38+
id: "realtime-up-to-date",
39+
run: async ({ runId }: { runId?: string }) => {
40+
if (!runId) {
41+
const handle = await helloWorldTask.trigger(
42+
{ hello: "world" },
43+
{
44+
tags: ["hello-world", "realtime"],
45+
}
46+
);
47+
48+
runId = handle.id;
49+
}
50+
51+
logger.info("runId", { runId });
52+
53+
for await (const run of runs.subscribeToRun(runId, { stopOnCompletion: true })) {
54+
logger.info("run", { run });
55+
}
56+
57+
return {
58+
message: "Hello, world!",
59+
};
60+
},
61+
});

0 commit comments

Comments
 (0)