Skip to content

Commit 9dfd6a5

Browse files
authored
Rate limit alerts by channel for task run alerts using generic cell rate algo (#1679)
1 parent 5210d3a commit 9dfd6a5

7 files changed

+528
-21
lines changed

apps/webapp/app/env.server.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,39 @@ const EnvironmentSchema = z.object({
308308
ALERT_SMTP_SECURE: z.coerce.boolean().optional(),
309309
ALERT_SMTP_USER: z.string().optional(),
310310
ALERT_SMTP_PASSWORD: z.string().optional(),
311+
ALERT_RATE_LIMITER_EMISSION_INTERVAL: z.coerce.number().int().default(2_500),
312+
ALERT_RATE_LIMITER_BURST_TOLERANCE: z.coerce.number().int().default(10_000),
313+
ALERT_RATE_LIMITER_REDIS_HOST: z
314+
.string()
315+
.optional()
316+
.transform((v) => v ?? process.env.REDIS_HOST),
317+
ALERT_RATE_LIMITER_REDIS_READER_HOST: z
318+
.string()
319+
.optional()
320+
.transform((v) => v ?? process.env.REDIS_READER_HOST),
321+
ALERT_RATE_LIMITER_REDIS_READER_PORT: z.coerce
322+
.number()
323+
.optional()
324+
.transform(
325+
(v) =>
326+
v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined)
327+
),
328+
ALERT_RATE_LIMITER_REDIS_PORT: z.coerce
329+
.number()
330+
.optional()
331+
.transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)),
332+
ALERT_RATE_LIMITER_REDIS_USERNAME: z
333+
.string()
334+
.optional()
335+
.transform((v) => v ?? process.env.REDIS_USERNAME),
336+
ALERT_RATE_LIMITER_REDIS_PASSWORD: z
337+
.string()
338+
.optional()
339+
.transform((v) => v ?? process.env.REDIS_PASSWORD),
340+
ALERT_RATE_LIMITER_REDIS_TLS_DISABLED: z
341+
.string()
342+
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
343+
ALERT_RATE_LIMITER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
311344

312345
MAX_SEQUENTIAL_INDEX_FAILURE_COUNT: z.coerce.number().default(96),
313346

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
import Redis, { Cluster } from "ioredis";
2+
3+
/**
4+
* Options for configuring the RateLimiter.
5+
*/
6+
export interface GCRARateLimiterOptions {
7+
/** An instance of ioredis. */
8+
redis: Redis | Cluster;
9+
/**
10+
* A string prefix to namespace keys in Redis.
11+
* Defaults to "ratelimit:".
12+
*/
13+
keyPrefix?: string;
14+
/**
15+
* The minimum interval between requests (the emission interval) in milliseconds.
16+
* For example, 1000 ms for one request per second.
17+
*/
18+
emissionInterval: number;
19+
/**
20+
* The burst tolerance in milliseconds. This represents how much “credit” can be
21+
* accumulated to allow short bursts beyond the average rate.
22+
* For example, if you want to allow 3 requests in a burst with an emission interval of 1000 ms,
23+
* you might set this to 3000.
24+
*/
25+
burstTolerance: number;
26+
/**
27+
* Expiration for the Redis key in milliseconds.
28+
* Defaults to the larger of 60 seconds or (emissionInterval + burstTolerance).
29+
*/
30+
keyExpiration?: number;
31+
}
32+
33+
/**
34+
* The result of a rate limit check.
35+
*/
36+
export interface RateLimitResult {
37+
/** Whether the request is allowed. */
38+
allowed: boolean;
39+
/**
40+
* If not allowed, this is the number of milliseconds the caller should wait
41+
* before retrying.
42+
*/
43+
retryAfter?: number;
44+
}
45+
46+
/**
47+
* A rate limiter using Redis and the Generic Cell Rate Algorithm (GCRA).
48+
*
49+
* The GCRA is implemented using a Lua script that runs atomically in Redis.
50+
*
51+
* When a request comes in, the algorithm:
52+
* - Retrieves the current "Theoretical Arrival Time" (TAT) from Redis (or initializes it if missing).
53+
* - If the current time is greater than or equal to the TAT, the request is allowed and the TAT is updated to now + emissionInterval.
54+
* - Otherwise, if the current time plus the burst tolerance is at least the TAT, the request is allowed and the TAT is incremented.
55+
* - If neither condition is met, the request is rejected and a Retry-After value is returned.
56+
*/
57+
export class GCRARateLimiter {
58+
private redis: Redis | Cluster;
59+
private keyPrefix: string;
60+
private emissionInterval: number;
61+
private burstTolerance: number;
62+
private keyExpiration: number;
63+
64+
constructor(options: GCRARateLimiterOptions) {
65+
this.redis = options.redis;
66+
this.keyPrefix = options.keyPrefix || "gcra:ratelimit:";
67+
this.emissionInterval = options.emissionInterval;
68+
this.burstTolerance = options.burstTolerance;
69+
// Default expiration: at least 60 seconds or the sum of emissionInterval and burstTolerance
70+
this.keyExpiration =
71+
options.keyExpiration || Math.max(60_000, this.emissionInterval + this.burstTolerance);
72+
73+
// Define a custom Redis command 'gcra' that implements the GCRA algorithm.
74+
// Using defineCommand ensures the Lua script is loaded once and run atomically.
75+
this.redis.defineCommand("gcra", {
76+
numberOfKeys: 1,
77+
lua: `
78+
--[[
79+
GCRA Lua script
80+
KEYS[1] - The rate limit key (e.g. "ratelimit:<identifier>")
81+
ARGV[1] - Current time in ms (number)
82+
ARGV[2] - Emission interval in ms (number)
83+
ARGV[3] - Burst tolerance in ms (number)
84+
ARGV[4] - Key expiration in ms (number)
85+
86+
Returns: { allowedFlag, value }
87+
allowedFlag: 1 if allowed, 0 if rate-limited.
88+
value: 0 when allowed; if not allowed, the number of ms to wait.
89+
]]--
90+
91+
local key = KEYS[1]
92+
local now = tonumber(ARGV[1])
93+
local emission_interval = tonumber(ARGV[2])
94+
local burst_tolerance = tonumber(ARGV[3])
95+
local expire = tonumber(ARGV[4])
96+
97+
-- Get the stored Theoretical Arrival Time (TAT) or default to 0.
98+
local tat = tonumber(redis.call("GET", key) or 0)
99+
if tat == 0 then
100+
tat = now
101+
end
102+
103+
local allowed, new_tat, retry_after
104+
105+
if now >= tat then
106+
-- No delay: request is on schedule.
107+
new_tat = now + emission_interval
108+
allowed = true
109+
elseif (now + burst_tolerance) >= tat then
110+
-- Within burst capacity: allow request.
111+
new_tat = tat + emission_interval
112+
allowed = true
113+
else
114+
-- Request exceeds the allowed burst; calculate wait time.
115+
allowed = false
116+
retry_after = tat - (now + burst_tolerance)
117+
end
118+
119+
if allowed then
120+
redis.call("SET", key, new_tat, "PX", expire)
121+
return {1, 0}
122+
else
123+
return {0, retry_after}
124+
end
125+
`,
126+
});
127+
}
128+
129+
/**
130+
* Checks whether a request associated with the given identifier is allowed.
131+
*
132+
* @param identifier A unique string identifying the subject of rate limiting (e.g. user ID, IP address, or domain).
133+
* @returns A promise that resolves to a RateLimitResult.
134+
*
135+
* @example
136+
* const result = await rateLimiter.check('user:12345');
137+
* if (!result.allowed) {
138+
* // Tell the client to retry after result.retryAfter milliseconds.
139+
* }
140+
*/
141+
async check(identifier: string): Promise<RateLimitResult> {
142+
const key = `${this.keyPrefix}${identifier}`;
143+
const now = Date.now();
144+
145+
try {
146+
// Call the custom 'gcra' command.
147+
// The script returns an array: [allowedFlag, value]
148+
// - allowedFlag: 1 if allowed; 0 if rejected.
149+
// - value: 0 when allowed; if rejected, the number of ms to wait before retrying.
150+
// @ts-expect-error: The custom command is defined via defineCommand.
151+
const result: [number, number] = await this.redis.gcra(
152+
key,
153+
now,
154+
this.emissionInterval,
155+
this.burstTolerance,
156+
this.keyExpiration
157+
);
158+
const allowed = result[0] === 1;
159+
if (allowed) {
160+
return { allowed: true };
161+
} else {
162+
return { allowed: false, retryAfter: result[1] };
163+
}
164+
} catch (error) {
165+
// In a production system you might log the error and either
166+
// allow the request (fail open) or deny it (fail closed).
167+
// Here we choose to propagate the error.
168+
throw error;
169+
}
170+
}
171+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import { env } from "~/env.server";
2+
import { createRedisClient } from "~/redis.server";
3+
import { GCRARateLimiter } from "./GCRARateLimiter.server";
4+
import { singleton } from "~/utils/singleton";
5+
import { logger } from "~/services/logger.server";
6+
7+
export const alertsRateLimiter = singleton("alertsRateLimiter", initializeAlertsRateLimiter);
8+
9+
function initializeAlertsRateLimiter() {
10+
const redis = createRedisClient("alerts:ratelimiter", {
11+
keyPrefix: "alerts:ratelimiter:",
12+
host: env.ALERT_RATE_LIMITER_REDIS_HOST,
13+
port: env.ALERT_RATE_LIMITER_REDIS_PORT,
14+
username: env.ALERT_RATE_LIMITER_REDIS_USERNAME,
15+
password: env.ALERT_RATE_LIMITER_REDIS_PASSWORD,
16+
tlsDisabled: env.ALERT_RATE_LIMITER_REDIS_TLS_DISABLED === "true",
17+
clusterMode: env.ALERT_RATE_LIMITER_REDIS_CLUSTER_MODE_ENABLED === "1",
18+
});
19+
20+
logger.debug(`🚦 Initializing alerts rate limiter at host ${env.ALERT_RATE_LIMITER_REDIS_HOST}`, {
21+
emissionInterval: env.ALERT_RATE_LIMITER_EMISSION_INTERVAL,
22+
burstTolerance: env.ALERT_RATE_LIMITER_BURST_TOLERANCE,
23+
});
24+
25+
return new GCRARateLimiter({
26+
redis,
27+
emissionInterval: env.ALERT_RATE_LIMITER_EMISSION_INTERVAL,
28+
burstTolerance: env.ALERT_RATE_LIMITER_BURST_TOLERANCE,
29+
});
30+
}

apps/webapp/app/v3/services/alerts/deliverAlert.server.ts

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import {
99
import { TaskRunError, createJsonErrorObject } from "@trigger.dev/core/v3";
1010
import assertNever from "assert-never";
1111
import { subtle } from "crypto";
12-
import { Prisma, PrismaClientOrTransaction, prisma } from "~/db.server";
12+
import { Prisma, prisma, PrismaClientOrTransaction } from "~/db.server";
1313
import { env } from "~/env.server";
1414
import {
1515
OrgIntegrationRepository,
@@ -25,10 +25,12 @@ import { DeploymentPresenter } from "~/presenters/v3/DeploymentPresenter.server"
2525
import { sendAlertEmail } from "~/services/email.server";
2626
import { logger } from "~/services/logger.server";
2727
import { decryptSecret } from "~/services/secrets/secretStore.server";
28-
import { workerQueue } from "~/services/worker.server";
29-
import { BaseService } from "../baseService.server";
30-
import { FINAL_ATTEMPT_STATUSES } from "~/v3/taskStatus";
3128
import { commonWorker } from "~/v3/commonWorker.server";
29+
import { FINAL_ATTEMPT_STATUSES } from "~/v3/taskStatus";
30+
import { BaseService } from "../baseService.server";
31+
import { generateFriendlyId } from "~/v3/friendlyIdentifiers";
32+
import { ProjectAlertType } from "@trigger.dev/database";
33+
import { alertsRateLimiter } from "~/v3/alertsRateLimiter.server";
3234

3335
type FoundAlert = Prisma.Result<
3436
typeof prisma.projectAlert,
@@ -1101,6 +1103,66 @@ export class DeliverAlertService extends BaseService {
11011103
availableAt: runAt,
11021104
});
11031105
}
1106+
1107+
static async createAndSendAlert(
1108+
{
1109+
channelId,
1110+
projectId,
1111+
environmentId,
1112+
alertType,
1113+
deploymentId,
1114+
taskRunId,
1115+
}: {
1116+
channelId: string;
1117+
projectId: string;
1118+
environmentId: string;
1119+
alertType: ProjectAlertType;
1120+
deploymentId?: string;
1121+
taskRunId?: string;
1122+
},
1123+
db: PrismaClientOrTransaction
1124+
) {
1125+
if (taskRunId) {
1126+
try {
1127+
const result = await alertsRateLimiter.check(channelId);
1128+
1129+
if (!result.allowed) {
1130+
logger.warn("[DeliverAlert] Rate limited", {
1131+
taskRunId,
1132+
environmentId,
1133+
alertType,
1134+
channelId,
1135+
result,
1136+
});
1137+
1138+
return;
1139+
}
1140+
} catch (error) {
1141+
logger.error("[DeliverAlert] Rate limiter error", {
1142+
taskRunId,
1143+
environmentId,
1144+
alertType,
1145+
channelId,
1146+
error,
1147+
});
1148+
}
1149+
}
1150+
1151+
const alert = await db.projectAlert.create({
1152+
data: {
1153+
friendlyId: generateFriendlyId("alert"),
1154+
channelId,
1155+
projectId,
1156+
environmentId,
1157+
status: "PENDING",
1158+
type: alertType,
1159+
workerDeploymentId: deploymentId,
1160+
taskRunId,
1161+
},
1162+
});
1163+
1164+
await DeliverAlertService.enqueue(alert.id);
1165+
}
11041166
}
11051167

11061168
function isWebAPIPlatformError(error: unknown): error is WebAPIPlatformError {

apps/webapp/app/v3/services/alerts/performDeploymentAlerts.server.ts

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,19 +46,16 @@ export class PerformDeploymentAlertsService extends BaseService {
4646
deployment: WorkerDeployment,
4747
alertType: ProjectAlertType
4848
) {
49-
const alert = await this._prisma.projectAlert.create({
50-
data: {
51-
friendlyId: generateFriendlyId("alert"),
49+
await DeliverAlertService.createAndSendAlert(
50+
{
5251
channelId: alertChannel.id,
5352
projectId: deployment.projectId,
5453
environmentId: deployment.environmentId,
55-
status: "PENDING",
56-
type: alertType,
57-
workerDeploymentId: deployment.id,
54+
alertType,
55+
deploymentId: deployment.id,
5856
},
59-
});
60-
61-
await DeliverAlertService.enqueue(alert.id);
57+
this._prisma
58+
);
6259
}
6360

6461
static async enqueue(deploymentId: string, runAt?: Date) {

apps/webapp/app/v3/services/alerts/performTaskRunAlerts.server.ts

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,19 +46,16 @@ export class PerformTaskRunAlertsService extends BaseService {
4646
}
4747

4848
async #createAndSendAlert(alertChannel: ProjectAlertChannel, run: FoundRun) {
49-
const alert = await this._prisma.projectAlert.create({
50-
data: {
51-
friendlyId: generateFriendlyId("alert"),
49+
await DeliverAlertService.createAndSendAlert(
50+
{
5251
channelId: alertChannel.id,
5352
projectId: run.projectId,
5453
environmentId: run.runtimeEnvironmentId,
55-
status: "PENDING",
56-
type: "TASK_RUN",
54+
alertType: "TASK_RUN",
5755
taskRunId: run.id,
5856
},
59-
});
60-
61-
await DeliverAlertService.enqueue(alert.id);
57+
this._prisma
58+
);
6259
}
6360

6461
static async enqueue(runId: string, runAt?: Date) {

0 commit comments

Comments
 (0)