Skip to content

Commit 6b355ab

Browse files
authored
Upgrades and fixes to Realtime and Realtime streams (#1549)
* Fix streaming splits in realtime streams v2 * Add changeset * Skip all flaky tests 😡 * Improve the way we stream from tasks to the server * Improve the v1 realtime streams (Redis) * Turn on the relay realtime stream service * Improved the relay realtime cleanup * Fixed consuming realtime runs w/streams after the run is already finished * Remove some logs * Update changeset * Fixed runStream tests
1 parent 68d7139 commit 6b355ab

27 files changed

+920
-181
lines changed

.changeset/rude-walls-help.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
"@trigger.dev/react-hooks": patch
4+
---
5+
6+
- Fixes an issue in streams where "chunks" could get split across multiple reads
7+
- Fixed stopping the run subscription after a run is finished, when using useRealtimeRun or useRealtimeRunWithStreams
8+
- Added an `onComplete` callback to `useRealtimeRun` and `useRealtimeRunWithStreams`
9+
- Optimized the run subscription to reduce unnecessary updates

apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -224,10 +224,15 @@ export class ApiRunListPresenter extends BasePresenter {
224224

225225
const data: ListRunResponseItem[] = await Promise.all(
226226
results.runs.map(async (run) => {
227-
const metadata = await parsePacket({
228-
data: run.metadata ?? undefined,
229-
dataType: run.metadataType,
230-
});
227+
const metadata = await parsePacket(
228+
{
229+
data: run.metadata ?? undefined,
230+
dataType: run.metadataType,
231+
},
232+
{
233+
filteredKeys: ["$$streams", "$$streamsVersion", "$$streamsBaseUrl"],
234+
}
235+
);
231236

232237
return {
233238
id: run.friendlyId,

apps/webapp/app/presenters/v3/SpanPresenter.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ export class SpanPresenter extends BasePresenter {
216216

217217
const metadata = run.metadata
218218
? await prettyPrintPacket(run.metadata, run.metadataType, {
219-
filteredKeys: ["$$streams", "$$streamsVersion"],
219+
filteredKeys: ["$$streams", "$$streamsVersion", "$$streamsBaseUrl"],
220220
})
221221
: undefined;
222222

apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { ActionFunctionArgs } from "@remix-run/server-runtime";
22
import { z } from "zod";
33
import { $replica } from "~/db.server";
4+
import { relayRealtimeStreams } from "~/services/realtime/relayRealtimeStreams.server";
45
import { v1RealtimeStreams } from "~/services/realtime/v1StreamsGlobal.server";
56
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
67

@@ -16,7 +17,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
1617
return new Response("No body provided", { status: 400 });
1718
}
1819

19-
return v1RealtimeStreams.ingestData(request.body, $params.runId, $params.streamId);
20+
return relayRealtimeStreams.ingestData(request.body, $params.runId, $params.streamId);
2021
}
2122

2223
export const loader = createLoaderApiRoute(
@@ -51,7 +52,7 @@ export const loader = createLoaderApiRoute(
5152
},
5253
},
5354
async ({ params, request, resource: run, authentication }) => {
54-
return v1RealtimeStreams.streamResponse(
55+
return relayRealtimeStreams.streamResponse(
5556
request,
5657
run.friendlyId,
5758
params.streamId,

apps/webapp/app/services/realtime/databaseRealtimeStreams.server.ts

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,14 @@ export class DatabaseRealtimeStreams implements StreamIngestor, StreamResponder
3737
): Promise<Response> {
3838
try {
3939
const textStream = stream.pipeThrough(new TextDecoderStream());
40+
4041
const reader = textStream.getReader();
4142
let sequence = 0;
4243

4344
while (true) {
4445
const { done, value } = await reader.read();
4546

46-
if (done) {
47+
if (done || !value) {
4748
break;
4849
}
4950

@@ -53,25 +54,13 @@ export class DatabaseRealtimeStreams implements StreamIngestor, StreamResponder
5354
value,
5455
});
5556

56-
const chunks = value
57-
.split("\n")
58-
.filter((chunk) => chunk) // Remove empty lines
59-
.map((line) => {
60-
return {
61-
sequence: sequence++,
62-
value: line,
63-
};
64-
});
65-
66-
await this.options.prisma.realtimeStreamChunk.createMany({
67-
data: chunks.map((chunk) => {
68-
return {
69-
runId,
70-
key: streamId,
71-
sequence: chunk.sequence,
72-
value: chunk.value,
73-
};
74-
}),
57+
await this.options.prisma.realtimeStreamChunk.create({
58+
data: {
59+
runId,
60+
key: streamId,
61+
sequence: sequence++,
62+
value,
63+
},
7564
});
7665
}
7766

apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts

Lines changed: 25 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
import Redis, { RedisKey, RedisOptions, RedisValue } from "ioredis";
1+
import Redis, { RedisOptions } from "ioredis";
2+
import { AuthenticatedEnvironment } from "../apiAuth.server";
23
import { logger } from "../logger.server";
34
import { StreamIngestor, StreamResponder } from "./types";
4-
import { AuthenticatedEnvironment } from "../apiAuth.server";
5+
import { LineTransformStream } from "./utils.server";
56

67
export type RealtimeStreamsOptions = {
78
redis: RedisOptions | undefined;
@@ -56,7 +57,7 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
5657
controller.close();
5758
return;
5859
}
59-
controller.enqueue(`data: ${fields[1]}\n\n`);
60+
controller.enqueue(fields[1]);
6061

6162
if (signal.aborted) {
6263
controller.close();
@@ -88,7 +89,18 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
8889
cancel: async () => {
8990
await cleanup();
9091
},
91-
});
92+
})
93+
.pipeThrough(new LineTransformStream())
94+
.pipeThrough(
95+
new TransformStream({
96+
transform(chunk, controller) {
97+
for (const line of chunk) {
98+
controller.enqueue(`data: ${line}\n\n`);
99+
}
100+
},
101+
})
102+
)
103+
.pipeThrough(new TextEncoderStream());
92104

93105
async function cleanup() {
94106
if (isCleanedUp) return;
@@ -98,7 +110,7 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
98110

99111
signal.addEventListener("abort", cleanup);
100112

101-
return new Response(stream.pipeThrough(new TextEncoderStream()), {
113+
return new Response(stream, {
102114
headers: {
103115
"Content-Type": "text/event-stream",
104116
"Cache-Control": "no-cache",
@@ -119,50 +131,28 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
119131
try {
120132
await redis.quit();
121133
} catch (error) {
122-
logger.error("[RealtimeStreams][ingestData] Error in cleanup:", { error });
134+
logger.error("[RedisRealtimeStreams][ingestData] Error in cleanup:", { error });
123135
}
124136
}
125137

126138
try {
127139
const textStream = stream.pipeThrough(new TextDecoderStream());
128140
const reader = textStream.getReader();
129141

130-
const batchSize = 10;
131-
let batchCommands: Array<[key: RedisKey, ...args: RedisValue[]]> = [];
132-
133142
while (true) {
134143
const { done, value } = await reader.read();
135144

136-
if (done) {
145+
if (done || !value) {
137146
break;
138147
}
139148

140-
logger.debug("[RealtimeStreams][ingestData] Reading data", { streamKey, value });
141-
142-
const lines = value.split("\n");
143-
144-
for (const line of lines) {
145-
if (line.trim()) {
146-
batchCommands.push([streamKey, "MAXLEN", "~", "2500", "*", "data", line]);
149+
logger.debug("[RedisRealtimeStreams][ingestData] Reading data", {
150+
streamKey,
151+
runId,
152+
value,
153+
});
147154

148-
if (batchCommands.length >= batchSize) {
149-
const pipeline = redis.pipeline();
150-
for (const args of batchCommands) {
151-
pipeline.xadd(...args);
152-
}
153-
await pipeline.exec();
154-
batchCommands = [];
155-
}
156-
}
157-
}
158-
}
159-
160-
if (batchCommands.length > 0) {
161-
const pipeline = redis.pipeline();
162-
for (const args of batchCommands) {
163-
pipeline.xadd(...args);
164-
}
165-
await pipeline.exec();
155+
await redis.xadd(streamKey, "MAXLEN", "~", "1000", "*", "data", value);
166156
}
167157

168158
await redis.xadd(streamKey, "MAXLEN", "~", "1000", "*", "data", END_SENTINEL);

0 commit comments

Comments
 (0)