Skip to content

Commit ef0fc6b

Browse files
authored
Support for logically separated redis instances (#1647)
* Remove unnecessary disabled org check * Add support for separate redis servers for caching, rate limiter, marqs, and the pub/sub stuff (they all default to the previously used env vars for redis)
1 parent a24b2a7 commit ef0fc6b

14 files changed

+157
-132
lines changed

apps/webapp/app/env.server.ts

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,93 @@ const EnvironmentSchema = z.object({
9191
REDIS_PASSWORD: z.string().optional(),
9292
REDIS_TLS_DISABLED: z.string().optional(),
9393

94+
RATE_LIMIT_REDIS_HOST: z
95+
.string()
96+
.optional()
97+
.transform((v) => v ?? process.env.REDIS_HOST),
98+
RATE_LIMIT_REDIS_READER_HOST: z
99+
.string()
100+
.optional()
101+
.transform((v) => v ?? process.env.REDIS_READER_HOST),
102+
RATE_LIMIT_REDIS_READER_PORT: z.coerce
103+
.number()
104+
.optional()
105+
.transform(
106+
(v) =>
107+
v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined)
108+
),
109+
RATE_LIMIT_REDIS_PORT: z.coerce
110+
.number()
111+
.optional()
112+
.transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)),
113+
RATE_LIMIT_REDIS_USERNAME: z
114+
.string()
115+
.optional()
116+
.transform((v) => v ?? process.env.REDIS_USERNAME),
117+
RATE_LIMIT_REDIS_PASSWORD: z
118+
.string()
119+
.optional()
120+
.transform((v) => v ?? process.env.REDIS_PASSWORD),
121+
RATE_LIMIT_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
122+
123+
CACHE_REDIS_HOST: z
124+
.string()
125+
.optional()
126+
.transform((v) => v ?? process.env.REDIS_HOST),
127+
CACHE_REDIS_READER_HOST: z
128+
.string()
129+
.optional()
130+
.transform((v) => v ?? process.env.REDIS_READER_HOST),
131+
CACHE_REDIS_READER_PORT: z.coerce
132+
.number()
133+
.optional()
134+
.transform(
135+
(v) =>
136+
v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined)
137+
),
138+
CACHE_REDIS_PORT: z.coerce
139+
.number()
140+
.optional()
141+
.transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)),
142+
CACHE_REDIS_USERNAME: z
143+
.string()
144+
.optional()
145+
.transform((v) => v ?? process.env.REDIS_USERNAME),
146+
CACHE_REDIS_PASSWORD: z
147+
.string()
148+
.optional()
149+
.transform((v) => v ?? process.env.REDIS_PASSWORD),
150+
CACHE_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
151+
152+
PUBSUB_REDIS_HOST: z
153+
.string()
154+
.optional()
155+
.transform((v) => v ?? process.env.REDIS_HOST),
156+
PUBSUB_REDIS_READER_HOST: z
157+
.string()
158+
.optional()
159+
.transform((v) => v ?? process.env.REDIS_READER_HOST),
160+
PUBSUB_REDIS_READER_PORT: z.coerce
161+
.number()
162+
.optional()
163+
.transform(
164+
(v) =>
165+
v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined)
166+
),
167+
PUBSUB_REDIS_PORT: z.coerce
168+
.number()
169+
.optional()
170+
.transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)),
171+
PUBSUB_REDIS_USERNAME: z
172+
.string()
173+
.optional()
174+
.transform((v) => v ?? process.env.REDIS_USERNAME),
175+
PUBSUB_REDIS_PASSWORD: z
176+
.string()
177+
.optional()
178+
.transform((v) => v ?? process.env.REDIS_PASSWORD),
179+
PUBSUB_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
180+
94181
DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT: z.coerce.number().int().default(10),
95182
DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT: z.coerce.number().int().default(10),
96183
DEFAULT_DEV_ENV_EXECUTION_ATTEMPTS: z.coerce.number().int().positive().default(1),

apps/webapp/app/services/apiRateLimit.server.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@ import { Duration } from "./rateLimiter.server";
55

66
export const apiRateLimiter = authorizationRateLimitMiddleware({
77
redis: {
8-
port: env.REDIS_PORT,
9-
host: env.REDIS_HOST,
10-
username: env.REDIS_USERNAME,
11-
password: env.REDIS_PASSWORD,
8+
port: env.RATE_LIMIT_REDIS_PORT,
9+
host: env.RATE_LIMIT_REDIS_HOST,
10+
username: env.RATE_LIMIT_REDIS_USERNAME,
11+
password: env.RATE_LIMIT_REDIS_PASSWORD,
1212
enableAutoPipelining: true,
13-
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
13+
...(env.RATE_LIMIT_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
1414
},
1515
keyPrefix: "api",
1616
defaultLimiter: {

apps/webapp/app/services/authorizationRateLimitMiddleware.server.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -163,12 +163,12 @@ export function authorizationRateLimitMiddleware({
163163

164164
const redisClient = createRedisRateLimitClient(
165165
redis ?? {
166-
port: env.REDIS_PORT,
167-
host: env.REDIS_HOST,
168-
username: env.REDIS_USERNAME,
169-
password: env.REDIS_PASSWORD,
166+
port: env.RATE_LIMIT_REDIS_PORT,
167+
host: env.RATE_LIMIT_REDIS_HOST,
168+
username: env.RATE_LIMIT_REDIS_USERNAME,
169+
password: env.RATE_LIMIT_REDIS_PASSWORD,
170170
enableAutoPipelining: true,
171-
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
171+
...(env.RATE_LIMIT_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
172172
}
173173
);
174174

apps/webapp/app/services/platform.v3.server.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,12 @@ function initializePlatformCache() {
3939
const redisCacheStore = new RedisCacheStore({
4040
connection: {
4141
keyPrefix: "tr:cache:platform:v3",
42-
port: env.REDIS_PORT,
43-
host: env.REDIS_HOST,
44-
username: env.REDIS_USERNAME,
45-
password: env.REDIS_PASSWORD,
42+
port: env.CACHE_REDIS_PORT,
43+
host: env.CACHE_REDIS_HOST,
44+
username: env.CACHE_REDIS_USERNAME,
45+
password: env.CACHE_REDIS_PASSWORD,
4646
enableAutoPipelining: true,
47-
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
47+
...(env.CACHE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
4848
},
4949
});
5050

apps/webapp/app/services/rateLimiter.server.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,12 @@ export class RateLimiter {
2828
redisClient ??
2929
createRedisRateLimitClient(
3030
redis ?? {
31-
port: env.REDIS_PORT,
32-
host: env.REDIS_HOST,
33-
username: env.REDIS_USERNAME,
34-
password: env.REDIS_PASSWORD,
31+
port: env.RATE_LIMIT_REDIS_PORT,
32+
host: env.RATE_LIMIT_REDIS_HOST,
33+
username: env.RATE_LIMIT_REDIS_USERNAME,
34+
password: env.RATE_LIMIT_REDIS_PASSWORD,
3535
enableAutoPipelining: true,
36-
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
36+
...(env.RATE_LIMIT_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
3737
}
3838
),
3939
limiter,

apps/webapp/app/services/realtimeClientGlobal.server.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@ function initializeRealtimeClient() {
88
electricOrigin: env.ELECTRIC_ORIGIN,
99
keyPrefix: "tr:realtime:concurrency",
1010
redis: {
11-
port: env.REDIS_PORT,
12-
host: env.REDIS_HOST,
13-
username: env.REDIS_USERNAME,
14-
password: env.REDIS_PASSWORD,
11+
port: env.RATE_LIMIT_REDIS_PORT,
12+
host: env.RATE_LIMIT_REDIS_HOST,
13+
username: env.RATE_LIMIT_REDIS_USERNAME,
14+
password: env.RATE_LIMIT_REDIS_PASSWORD,
1515
enableAutoPipelining: true,
16-
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
16+
...(env.RATE_LIMIT_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
1717
},
1818
cachedLimitProvider: {
1919
async getCachedLimit(organizationId, defaultValue) {

apps/webapp/app/v3/eventRepository.server.ts

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -991,26 +991,24 @@ export class EventRepository {
991991
async subscribeToTrace(traceId: string) {
992992
const redis = new Redis(this._config.redis);
993993

994-
const channel = `events:${traceId}:*`;
994+
const channel = `events:${traceId}`;
995995

996996
// Subscribe to the channel.
997-
await redis.psubscribe(channel);
997+
await redis.subscribe(channel);
998998

999999
// Increment the subscriber count.
10001000
this._subscriberCount++;
10011001

10021002
const eventEmitter = new EventEmitter();
10031003

10041004
// Define the message handler.
1005-
redis.on("pmessage", (pattern, channelReceived, message) => {
1006-
if (channelReceived.startsWith(`events:${traceId}:`)) {
1007-
eventEmitter.emit("message", message);
1008-
}
1005+
redis.on("message", (_, message) => {
1006+
eventEmitter.emit("message", message);
10091007
});
10101008

10111009
// Return a function that can be used to unsubscribe.
10121010
const unsubscribe = async () => {
1013-
await redis.punsubscribe(channel);
1011+
await redis.unsubscribe(channel);
10141012
redis.quit();
10151013
this._subscriberCount--;
10161014
};
@@ -1101,10 +1099,13 @@ export class EventRepository {
11011099

11021100
async #publishToRedis(events: CreatableEvent[]) {
11031101
if (events.length === 0) return;
1104-
const uniqueTraceSpans = new Set(events.map((e) => `events:${e.traceId}:${e.spanId}`));
1105-
for (const id of uniqueTraceSpans) {
1106-
await this._redisPublishClient.publish(id, new Date().toISOString());
1107-
}
1102+
const uniqueTraces = new Set(events.map((e) => `events:${e.traceId}`));
1103+
1104+
await Promise.allSettled(
1105+
Array.from(uniqueTraces).map((traceId) =>
1106+
this._redisPublishClient.publish(traceId, new Date().toISOString())
1107+
)
1108+
);
11081109
}
11091110

11101111
public generateTraceId() {
@@ -1142,12 +1143,12 @@ function initializeEventRepo() {
11421143
batchInterval: env.EVENTS_BATCH_INTERVAL,
11431144
retentionInDays: env.EVENTS_DEFAULT_LOG_RETENTION,
11441145
redis: {
1145-
port: env.REDIS_PORT,
1146-
host: env.REDIS_HOST,
1147-
username: env.REDIS_USERNAME,
1148-
password: env.REDIS_PASSWORD,
1146+
port: env.PUBSUB_REDIS_PORT,
1147+
host: env.PUBSUB_REDIS_HOST,
1148+
username: env.PUBSUB_REDIS_USERNAME,
1149+
password: env.PUBSUB_REDIS_PASSWORD,
11491150
enableAutoPipelining: true,
1150-
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
1151+
...(env.PUBSUB_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
11511152
},
11521153
});
11531154

apps/webapp/app/v3/marqs/devPubSub.server.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@ export const devPubSub = singleton("devPubSub", initializeDevPubSub);
2121
function initializeDevPubSub() {
2222
const pubSub = new ZodPubSub({
2323
redis: {
24-
port: env.REDIS_PORT,
25-
host: env.REDIS_HOST,
26-
username: env.REDIS_USERNAME,
27-
password: env.REDIS_PASSWORD,
24+
port: env.PUBSUB_REDIS_PORT,
25+
host: env.PUBSUB_REDIS_HOST,
26+
username: env.PUBSUB_REDIS_USERNAME,
27+
password: env.PUBSUB_REDIS_PASSWORD,
2828
enableAutoPipelining: true,
29-
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
29+
...(env.PUBSUB_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
3030
},
3131
schema: messageCatalog,
3232
});

apps/webapp/app/v3/marqs/fairDequeuingStrategy.server.ts

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ export type FairDequeuingStrategyOptions = {
3636
defaultOrgConcurrency: number;
3737
defaultEnvConcurrency: number;
3838
parentQueueLimit: number;
39-
checkForDisabledOrgs: boolean;
4039
tracer: Tracer;
4140
seed?: string;
4241
/**
@@ -88,7 +87,6 @@ const defaultBiases: FairDequeuingStrategyBiases = {
8887
export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
8988
private _cache: UnkeyCache<{
9089
concurrencyLimit: number;
91-
disabledConcurrency: boolean;
9290
}>;
9391

9492
private _rng: seedrandom.PRNG;
@@ -107,11 +105,6 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
107105
fresh: 60_000, // The time in milliseconds that a value is considered fresh. Cache hits within this time will return the cached value.
108106
stale: 180_000, // The time in milliseconds that a value is considered stale. Cache hits within this time will return the cached value and trigger a background refresh.
109107
}),
110-
disabledConcurrency: new Namespace<boolean>(ctx, {
111-
stores: [memory],
112-
fresh: 30_000, // The time in milliseconds that a value is considered fresh. Cache hits within this time will return the cached value.
113-
stale: 180_000, // The time in milliseconds that a value is considered stale. Cache hits within this time will return the cached value and trigger a background refresh.
114-
}),
115108
});
116109

117110
this._rng = seedrandom(options.seed);
@@ -512,16 +505,6 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
512505
return await startSpan(this.options.tracer, "getOrgConcurrency", async (span) => {
513506
span.setAttribute("org_id", orgId);
514507

515-
if (this.options.checkForDisabledOrgs) {
516-
const isDisabled = await this.#getConcurrencyDisabled(orgId);
517-
518-
if (isDisabled) {
519-
span.setAttribute("disabled", true);
520-
521-
return { current: 0, limit: 0 };
522-
}
523-
}
524-
525508
const [currentValue, limitValue] = await Promise.all([
526509
this.#getOrgCurrentConcurrency(orgId),
527510
this.#getOrgConcurrencyLimit(orgId),
@@ -587,22 +570,6 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
587570
});
588571
}
589572

590-
async #getConcurrencyDisabled(orgId: string) {
591-
return await startSpan(this.options.tracer, "getConcurrencyDisabled", async (span) => {
592-
span.setAttribute("org_id", orgId);
593-
594-
const key = this.options.keys.disabledConcurrencyLimitKey(orgId);
595-
596-
const result = await this._cache.disabledConcurrency.swr(key, async () => {
597-
const value = await this.options.redis.exists(key);
598-
599-
return Boolean(value);
600-
});
601-
602-
return typeof result.val === "boolean" ? result.val : false;
603-
});
604-
}
605-
606573
async #getOrgConcurrencyLimit(orgId: string) {
607574
return await startSpan(this.options.tracer, "getOrgConcurrencyLimit", async (span) => {
608575
span.setAttribute("org_id", orgId);

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1619,7 +1619,6 @@ function getMarQSClient() {
16191619
keys: keysProducer,
16201620
defaultEnvConcurrency: env.DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT,
16211621
defaultOrgConcurrency: env.DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT,
1622-
checkForDisabledOrgs: true,
16231622
biases: {
16241623
concurrencyLimitBias: env.MARQS_CONCURRENCY_LIMIT_BIAS,
16251624
availableCapacityBias: env.MARQS_AVAILABLE_CAPACITY_BIAS,
@@ -1635,7 +1634,6 @@ function getMarQSClient() {
16351634
keys: keysProducer,
16361635
defaultEnvConcurrency: env.DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT,
16371636
defaultOrgConcurrency: env.DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT,
1638-
checkForDisabledOrgs: false,
16391637
biases: {
16401638
concurrencyLimitBias: 0.0,
16411639
availableCapacityBias: 0.0,

apps/webapp/app/v3/marqs/v2.server.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@ function getMarQSClient() {
8181
keys: new MarQSV2KeyProducer(KEY_PREFIX),
8282
defaultEnvConcurrency: env.V2_MARQS_DEFAULT_ENV_CONCURRENCY,
8383
defaultOrgConcurrency: env.DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT,
84-
checkForDisabledOrgs: true,
8584
}),
8685
envQueuePriorityStrategy: new NoopFairDequeuingStrategy(), // We don't use this in v2, since all queues go through the shared queue
8786
workers: 0,

apps/webapp/app/v3/services/projectPubSub.server.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@ export const projectPubSub = singleton("projectPubSub", initializeProjectPubSub)
2222
function initializeProjectPubSub() {
2323
const pubSub = new ZodPubSub({
2424
redis: {
25-
port: env.REDIS_PORT,
26-
host: env.REDIS_HOST,
27-
username: env.REDIS_USERNAME,
28-
password: env.REDIS_PASSWORD,
25+
port: env.PUBSUB_REDIS_PORT,
26+
host: env.PUBSUB_REDIS_HOST,
27+
username: env.PUBSUB_REDIS_USERNAME,
28+
password: env.PUBSUB_REDIS_PASSWORD,
2929
enableAutoPipelining: true,
30-
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
30+
...(env.PUBSUB_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
3131
},
3232
schema: messageCatalog,
3333
});

0 commit comments

Comments
 (0)