Skip to content

Commit 62e3238

Browse files
authored
Add configurable redis TTL on realtime streams (#1725)
1 parent 5a250cf commit 62e3238

File tree

2 files changed

+28
-2
lines changed

2 files changed

+28
-2
lines changed

apps/webapp/app/env.server.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,11 @@ const EnvironmentSchema = z.object({
399399
MAX_BATCH_AND_WAIT_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500),
400400

401401
REALTIME_STREAM_VERSION: z.enum(["v1", "v2"]).default("v1"),
402+
REALTIME_STREAM_MAX_LENGTH: z.coerce.number().int().default(1000),
403+
REALTIME_STREAM_TTL: z.coerce
404+
.number()
405+
.int()
406+
.default(60 * 60 * 24), // 1 day in seconds
402407
BATCH_METADATA_OPERATIONS_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000),
403408
BATCH_METADATA_OPERATIONS_FLUSH_ENABLED: z.string().default("1"),
404409
BATCH_METADATA_OPERATIONS_FLUSH_LOGGING_ENABLED: z.string().default("1"),

apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { AuthenticatedEnvironment } from "../apiAuth.server";
33
import { logger } from "../logger.server";
44
import { StreamIngestor, StreamResponder } from "./types";
55
import { LineTransformStream } from "./utils.server";
6+
import { env } from "~/env.server";
67

78
export type RealtimeStreamsOptions = {
89
redis: RedisOptions | undefined;
@@ -152,10 +153,30 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
152153
value,
153154
});
154155

155-
await redis.xadd(streamKey, "MAXLEN", "~", "1000", "*", "data", value);
156+
await redis.xadd(
157+
streamKey,
158+
"MAXLEN",
159+
"~",
160+
String(env.REALTIME_STREAM_MAX_LENGTH),
161+
"*",
162+
"data",
163+
value
164+
);
156165
}
157166

158-
await redis.xadd(streamKey, "MAXLEN", "~", "1000", "*", "data", END_SENTINEL);
167+
// Send the END_SENTINEL and set TTL with a pipeline.
168+
const pipeline = redis.pipeline();
169+
pipeline.xadd(
170+
streamKey,
171+
"MAXLEN",
172+
"~",
173+
String(env.REALTIME_STREAM_MAX_LENGTH),
174+
"*",
175+
"data",
176+
END_SENTINEL
177+
);
178+
pipeline.expire(streamKey, env.REALTIME_STREAM_TTL);
179+
await pipeline.exec();
159180

160181
return new Response(null, { status: 200 });
161182
} catch (error) {

0 commit comments

Comments
 (0)