From 949153151c3e02bbfd43d75211cf1b9e898a16c7 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Tue, 10 Jun 2025 14:50:43 +0100 Subject: [PATCH 1/2] Fix realtime re-subscribing stale data issue Fixes an issue with realtime when re-subscribing to a run, that would temporarily display stale data and the changes. Now when re-subscribing to a run only the latest changes will be vended --- .changeset/wicked-ads-walk.md | 6 +++++ packages/core/src/v3/apiClient/stream.ts | 19 +++++++++++-- .../hello-world/src/trigger/realtime.ts | 27 +++++++++++++++++++ 3 files changed, 50 insertions(+), 2 deletions(-) create mode 100644 .changeset/wicked-ads-walk.md diff --git a/.changeset/wicked-ads-walk.md b/.changeset/wicked-ads-walk.md new file mode 100644 index 0000000000..c9190c709f --- /dev/null +++ b/.changeset/wicked-ads-walk.md @@ -0,0 +1,6 @@ +--- +"@trigger.dev/react-hooks": patch +"@trigger.dev/core": patch +--- + +Fixes an issue with realtime when re-subscribing to a run, that would temporarily display stale data and the changes. Now when re-subscribing to a run only the latest changes will be vended diff --git a/packages/core/src/v3/apiClient/stream.ts b/packages/core/src/v3/apiClient/stream.ts index 0e155fb33c..41a30d0979 100644 --- a/packages/core/src/v3/apiClient/stream.ts +++ b/packages/core/src/v3/apiClient/stream.ts @@ -114,6 +114,8 @@ class ReadableShapeStream = Row> { }, }); + let updatedKeys = new Set(); + // Create the transformed stream that processes messages and emits complete rows this.#changeStream = createAsyncIterableStream(source, { transform: (messages, controller) => { @@ -122,9 +124,13 @@ class ReadableShapeStream = Row> { } try { - const updatedKeys = new Set(); + let isUpToDate = false; + + console.log(`Processing ${messages.length} messages`); for (const message of messages) { + console.log("shape message", message); + if (isChangeMessage(message)) { const key = message.key; switch (message.headers.operation) { @@ -147,18 +153,27 @@ class ReadableShapeStream = Row> { if (message.headers.control === "must-refetch") { this.#currentState.clear(); this.#error = false; + } else if (message.headers.control === "up-to-date") { + console.log("Setting isUpToDate to true"); + isUpToDate = true; } } } // Now enqueue only one updated row per key, after all messages have been processed. - if (!this.#isStreamClosed) { + // If the stream is not up to date, we don't want to enqueue any rows. + if (!this.#isStreamClosed && isUpToDate) { for (const key of updatedKeys) { const finalRow = this.#currentState.get(key); if (finalRow) { + console.log("enqueueing finalRow", finalRow); controller.enqueue(finalRow); } } + + updatedKeys.clear(); + } else { + console.log("Not enqueuing any rows because the stream is not up to date"); } } catch (error) { console.error("Error processing stream messages:", error); diff --git a/references/hello-world/src/trigger/realtime.ts b/references/hello-world/src/trigger/realtime.ts index 951ec6735f..5a4d571c05 100644 --- a/references/hello-world/src/trigger/realtime.ts +++ b/references/hello-world/src/trigger/realtime.ts @@ -1,5 +1,6 @@ import { logger, runs, task } from "@trigger.dev/sdk"; import { helloWorldTask } from "./example.js"; +import { setTimeout } from "timers/promises"; export const realtimeByTagsTask = task({ id: "realtime-by-tags", @@ -32,3 +33,29 @@ export const realtimeByTagsTask = task({ }; }, }); + +export const realtimeUpToDateTask = task({ + id: "realtime-up-to-date", + run: async ({ runId }: { runId?: string }) => { + if (!runId) { + const handle = await helloWorldTask.trigger( + { hello: "world" }, + { + tags: ["hello-world", "realtime"], + } + ); + + runId = handle.id; + } + + logger.info("runId", { runId }); + + for await (const run of runs.subscribeToRun(runId, { stopOnCompletion: true })) { + logger.info("run", { run }); + } + + return { + message: "Hello, world!", + }; + }, +}); From d6d252fa64e9bcf2372cbeb91a97210631bdb41c Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Tue, 10 Jun 2025 14:55:20 +0100 Subject: [PATCH 2/2] removed logs --- packages/core/src/v3/apiClient/stream.ts | 8 -------- 1 file changed, 8 deletions(-) diff --git a/packages/core/src/v3/apiClient/stream.ts b/packages/core/src/v3/apiClient/stream.ts index 41a30d0979..428fad8e94 100644 --- a/packages/core/src/v3/apiClient/stream.ts +++ b/packages/core/src/v3/apiClient/stream.ts @@ -126,11 +126,7 @@ class ReadableShapeStream = Row> { try { let isUpToDate = false; - console.log(`Processing ${messages.length} messages`); - for (const message of messages) { - console.log("shape message", message); - if (isChangeMessage(message)) { const key = message.key; switch (message.headers.operation) { @@ -154,7 +150,6 @@ class ReadableShapeStream = Row> { this.#currentState.clear(); this.#error = false; } else if (message.headers.control === "up-to-date") { - console.log("Setting isUpToDate to true"); isUpToDate = true; } } @@ -166,14 +161,11 @@ class ReadableShapeStream = Row> { for (const key of updatedKeys) { const finalRow = this.#currentState.get(key); if (finalRow) { - console.log("enqueueing finalRow", finalRow); controller.enqueue(finalRow); } } updatedKeys.clear(); - } else { - console.log("Not enqueuing any rows because the stream is not up to date"); } } catch (error) { console.error("Error processing stream messages:", error);