From 4e73627c8fb4481cbc40b06036e26e36a3004b5f Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 29 Jan 2025 16:18:15 +0000 Subject: [PATCH] Support redis/valkey cluster mode --- apps/webapp/app/env.server.ts | 3 + apps/webapp/app/redis.server.ts | 63 ++++++++++ .../app/services/apiRateLimit.server.ts | 4 +- ...authorizationRateLimitMiddleware.server.ts | 7 +- .../webapp/app/services/platform.v3.server.ts | 4 +- .../webapp/app/services/rateLimiter.server.ts | 13 +- .../app/services/realtimeClient.server.ts | 7 +- .../services/realtimeClientGlobal.server.ts | 4 +- .../services/unkey/redisCacheStore.server.ts | 13 +- apps/webapp/app/v3/eventRepository.server.ts | 14 +-- apps/webapp/app/v3/marqs/devPubSub.server.ts | 4 +- .../app/v3/services/projectPubSub.server.ts | 4 +- apps/webapp/app/v3/utils/zodPubSub.server.ts | 12 +- docker/docker-compose.yml | 111 +++++++++++++++--- 14 files changed, 204 insertions(+), 59 deletions(-) create mode 100644 apps/webapp/app/redis.server.ts diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 2ba25e04ef..7051fb7367 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -119,6 +119,7 @@ const EnvironmentSchema = z.object({ .optional() .transform((v) => v ?? process.env.REDIS_PASSWORD), RATE_LIMIT_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"), + RATE_LIMIT_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"), CACHE_REDIS_HOST: z .string() @@ -148,6 +149,7 @@ const EnvironmentSchema = z.object({ .optional() .transform((v) => v ?? process.env.REDIS_PASSWORD), CACHE_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"), + CACHE_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"), PUBSUB_REDIS_HOST: z .string() @@ -177,6 +179,7 @@ const EnvironmentSchema = z.object({ .optional() .transform((v) => v ?? process.env.REDIS_PASSWORD), PUBSUB_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"), + PUBSUB_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"), DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT: z.coerce.number().int().default(10), DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT: z.coerce.number().int().default(10), diff --git a/apps/webapp/app/redis.server.ts b/apps/webapp/app/redis.server.ts new file mode 100644 index 0000000000..0cf6e00bd3 --- /dev/null +++ b/apps/webapp/app/redis.server.ts @@ -0,0 +1,63 @@ +import { Cluster, Redis, type ClusterNode, type ClusterOptions } from "ioredis"; +import { logger } from "./services/logger.server"; + +export type RedisWithClusterOptions = { + host?: string; + port?: number; + username?: string; + password?: string; + tlsDisabled?: boolean; + clusterMode?: boolean; + clusterOptions?: Omit; + keyPrefix?: string; +}; + +export type RedisClient = Redis | Cluster; + +export function createRedisClient( + connectionName: string, + options: RedisWithClusterOptions +): Redis | Cluster { + if (options.clusterMode) { + const nodes: ClusterNode[] = [ + { + host: options.host, + port: options.port, + }, + ]; + + logger.debug("Creating a redis cluster client", { + connectionName, + host: options.host, + port: options.port, + }); + + return new Redis.Cluster(nodes, { + ...options.clusterOptions, + redisOptions: { + connectionName, + keyPrefix: options.keyPrefix, + username: options.username, + password: options.password, + enableAutoPipelining: true, + ...(options.tlsDisabled ? {} : { tls: {} }), + }, + }); + } else { + logger.debug("Creating a redis client", { + connectionName, + host: options.host, + port: options.port, + }); + + return new Redis({ + connectionName, + host: options.host, + port: options.port, + username: options.username, + password: options.password, + enableAutoPipelining: true, + ...(options.tlsDisabled ? {} : { tls: {} }), + }); + } +} diff --git a/apps/webapp/app/services/apiRateLimit.server.ts b/apps/webapp/app/services/apiRateLimit.server.ts index 108f75cc8a..466aaa98b8 100644 --- a/apps/webapp/app/services/apiRateLimit.server.ts +++ b/apps/webapp/app/services/apiRateLimit.server.ts @@ -9,8 +9,8 @@ export const apiRateLimiter = authorizationRateLimitMiddleware({ host: env.RATE_LIMIT_REDIS_HOST, username: env.RATE_LIMIT_REDIS_USERNAME, password: env.RATE_LIMIT_REDIS_PASSWORD, - enableAutoPipelining: true, - ...(env.RATE_LIMIT_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + tlsDisabled: env.RATE_LIMIT_REDIS_TLS_DISABLED === "true", + clusterMode: env.RATE_LIMIT_REDIS_CLUSTER_MODE_ENABLED === "1", }, keyPrefix: "api", defaultLimiter: { diff --git a/apps/webapp/app/services/authorizationRateLimitMiddleware.server.ts b/apps/webapp/app/services/authorizationRateLimitMiddleware.server.ts index 6a6b6995d6..0ca4b80afd 100644 --- a/apps/webapp/app/services/authorizationRateLimitMiddleware.server.ts +++ b/apps/webapp/app/services/authorizationRateLimitMiddleware.server.ts @@ -9,6 +9,7 @@ import { env } from "~/env.server"; import { logger } from "./logger.server"; import { createRedisRateLimitClient, Duration, RateLimiter } from "./rateLimiter.server"; import { RedisCacheStore } from "./unkey/redisCacheStore.server"; +import { RedisWithClusterOptions } from "~/redis.server"; const DurationSchema = z.custom((value) => { if (typeof value !== "string") { @@ -54,7 +55,7 @@ export type RateLimiterConfig = z.infer; type LimitConfigOverrideFunction = (authorizationValue: string) => Promise; type Options = { - redis?: RedisOptions; + redis?: RedisWithClusterOptions; keyPrefix: string; pathMatchers: (RegExp | string)[]; pathWhiteList?: (RegExp | string)[]; @@ -167,8 +168,8 @@ export function authorizationRateLimitMiddleware({ host: env.RATE_LIMIT_REDIS_HOST, username: env.RATE_LIMIT_REDIS_USERNAME, password: env.RATE_LIMIT_REDIS_PASSWORD, - enableAutoPipelining: true, - ...(env.RATE_LIMIT_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + tlsDisabled: env.RATE_LIMIT_REDIS_TLS_DISABLED === "true", + clusterMode: env.RATE_LIMIT_REDIS_CLUSTER_MODE_ENABLED === "1", } ); diff --git a/apps/webapp/app/services/platform.v3.server.ts b/apps/webapp/app/services/platform.v3.server.ts index 804dd4c52d..c4638fbff1 100644 --- a/apps/webapp/app/services/platform.v3.server.ts +++ b/apps/webapp/app/services/platform.v3.server.ts @@ -43,8 +43,8 @@ function initializePlatformCache() { host: env.CACHE_REDIS_HOST, username: env.CACHE_REDIS_USERNAME, password: env.CACHE_REDIS_PASSWORD, - enableAutoPipelining: true, - ...(env.CACHE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + tlsDisabled: env.CACHE_REDIS_TLS_DISABLED === "true", + clusterMode: env.CACHE_REDIS_CLUSTER_MODE_ENABLED === "1", }, }); diff --git a/apps/webapp/app/services/rateLimiter.server.ts b/apps/webapp/app/services/rateLimiter.server.ts index 9fdb4da0de..3d6c5ae680 100644 --- a/apps/webapp/app/services/rateLimiter.server.ts +++ b/apps/webapp/app/services/rateLimiter.server.ts @@ -1,6 +1,7 @@ import { Ratelimit } from "@upstash/ratelimit"; -import Redis, { RedisOptions } from "ioredis"; +import { RedisOptions } from "ioredis"; import { env } from "~/env.server"; +import { createRedisClient, RedisWithClusterOptions } from "~/redis.server"; import { logger } from "./logger.server"; type Options = { @@ -32,8 +33,8 @@ export class RateLimiter { host: env.RATE_LIMIT_REDIS_HOST, username: env.RATE_LIMIT_REDIS_USERNAME, password: env.RATE_LIMIT_REDIS_PASSWORD, - enableAutoPipelining: true, - ...(env.RATE_LIMIT_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + tlsDisabled: env.RATE_LIMIT_REDIS_TLS_DISABLED === "true", + clusterMode: env.RATE_LIMIT_REDIS_CLUSTER_MODE_ENABLED === "1", } ), limiter, @@ -70,8 +71,10 @@ export class RateLimiter { } } -export function createRedisRateLimitClient(redisOptions: RedisOptions): RateLimiterRedisClient { - const redis = new Redis(redisOptions); +export function createRedisRateLimitClient( + redisOptions: RedisWithClusterOptions +): RateLimiterRedisClient { + const redis = createRedisClient("trigger:rateLimiter", redisOptions); return { sadd: async (key: string, ...members: TData[]): Promise => { diff --git a/apps/webapp/app/services/realtimeClient.server.ts b/apps/webapp/app/services/realtimeClient.server.ts index 5afd4bc31a..919be0086d 100644 --- a/apps/webapp/app/services/realtimeClient.server.ts +++ b/apps/webapp/app/services/realtimeClient.server.ts @@ -3,6 +3,7 @@ import Redis, { Callback, Result, type RedisOptions } from "ioredis"; import { randomUUID } from "node:crypto"; import { longPollingFetch } from "~/utils/longPollingFetch"; import { logger } from "./logger.server"; +import { createRedisClient, RedisClient, RedisWithClusterOptions } from "~/redis.server"; export interface CachedLimitProvider { getCachedLimit: (organizationId: string, defaultValue: number) => Promise; @@ -10,7 +11,7 @@ export interface CachedLimitProvider { export type RealtimeClientOptions = { electricOrigin: string; - redis: RedisOptions; + redis: RedisWithClusterOptions; cachedLimitProvider: CachedLimitProvider; keyPrefix: string; expiryTimeInSeconds?: number; @@ -26,12 +27,12 @@ export type RealtimeRunsParams = { }; export class RealtimeClient { - private redis: Redis; + private redis: RedisClient; private expiryTimeInSeconds: number; private cachedLimitProvider: CachedLimitProvider; constructor(private options: RealtimeClientOptions) { - this.redis = new Redis(options.redis); + this.redis = createRedisClient("trigger:realtime", options.redis); this.expiryTimeInSeconds = options.expiryTimeInSeconds ?? 60 * 5; // default to 5 minutes this.cachedLimitProvider = options.cachedLimitProvider; this.#registerCommands(); diff --git a/apps/webapp/app/services/realtimeClientGlobal.server.ts b/apps/webapp/app/services/realtimeClientGlobal.server.ts index 106d22ac85..579f84b906 100644 --- a/apps/webapp/app/services/realtimeClientGlobal.server.ts +++ b/apps/webapp/app/services/realtimeClientGlobal.server.ts @@ -12,8 +12,8 @@ function initializeRealtimeClient() { host: env.RATE_LIMIT_REDIS_HOST, username: env.RATE_LIMIT_REDIS_USERNAME, password: env.RATE_LIMIT_REDIS_PASSWORD, - enableAutoPipelining: true, - ...(env.RATE_LIMIT_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + tlsDisabled: env.RATE_LIMIT_REDIS_TLS_DISABLED === "true", + clusterMode: env.RATE_LIMIT_REDIS_CLUSTER_MODE_ENABLED === "1", }, cachedLimitProvider: { async getCachedLimit(organizationId, defaultValue) { diff --git a/apps/webapp/app/services/unkey/redisCacheStore.server.ts b/apps/webapp/app/services/unkey/redisCacheStore.server.ts index 059f9183d5..459b6d8e27 100644 --- a/apps/webapp/app/services/unkey/redisCacheStore.server.ts +++ b/apps/webapp/app/services/unkey/redisCacheStore.server.ts @@ -1,21 +1,20 @@ -import { Err, Ok, type Result } from "@unkey/error"; -import type { Entry, Store } from "@unkey/cache/stores"; -import type { RedisOptions } from "ioredis"; -import { Redis } from "ioredis"; import { CacheError } from "@unkey/cache"; +import type { Entry, Store } from "@unkey/cache/stores"; +import { Err, Ok, type Result } from "@unkey/error"; +import { createRedisClient, RedisClient, RedisWithClusterOptions } from "~/redis.server"; export type RedisCacheStoreConfig = { - connection: RedisOptions; + connection: RedisWithClusterOptions; }; export class RedisCacheStore implements Store { public readonly name = "redis"; - private readonly redis: Redis; + private readonly redis: RedisClient; constructor(config: RedisCacheStoreConfig) { - this.redis = new Redis(config.connection); + this.redis = createRedisClient("trigger:cacheStore", config.connection); } private buildCacheKey(namespace: TNamespace, key: string): string { diff --git a/apps/webapp/app/v3/eventRepository.server.ts b/apps/webapp/app/v3/eventRepository.server.ts index f763f992e7..86dc446f38 100644 --- a/apps/webapp/app/v3/eventRepository.server.ts +++ b/apps/webapp/app/v3/eventRepository.server.ts @@ -20,7 +20,6 @@ import { unflattenAttributes, } from "@trigger.dev/core/v3"; import { Prisma, TaskEvent, TaskEventStatus, type TaskEventKind } from "@trigger.dev/database"; -import Redis, { RedisOptions } from "ioredis"; import { createHash } from "node:crypto"; import { EventEmitter } from "node:stream"; import { Gauge } from "prom-client"; @@ -32,6 +31,7 @@ import { logger } from "~/services/logger.server"; import { singleton } from "~/utils/singleton"; import { DynamicFlushScheduler } from "./dynamicFlushScheduler.server"; import { startActiveSpan } from "./tracer.server"; +import { createRedisClient, RedisClient, RedisWithClusterOptions } from "~/redis.server"; const MAX_FLUSH_DEPTH = 5; @@ -97,7 +97,7 @@ export type EventBuilder = { export type EventRepoConfig = { batchSize: number; batchInterval: number; - redis: RedisOptions; + redis: RedisWithClusterOptions; retentionInDays: number; }; @@ -200,7 +200,7 @@ type TaskEventSummary = Pick< export class EventRepository { private readonly _flushScheduler: DynamicFlushScheduler; private _randomIdGenerator = new RandomIdGenerator(); - private _redisPublishClient: Redis; + private _redisPublishClient: RedisClient; private _subscriberCount = 0; get subscriberCount() { @@ -218,7 +218,7 @@ export class EventRepository { callback: this.#flushBatch.bind(this), }); - this._redisPublishClient = new Redis(this._config.redis); + this._redisPublishClient = createRedisClient("trigger:eventRepoPublisher", this._config.redis); } async insert(event: CreatableEvent) { @@ -989,7 +989,7 @@ export class EventRepository { } async subscribeToTrace(traceId: string) { - const redis = new Redis(this._config.redis); + const redis = createRedisClient("trigger:eventRepoSubscriber", this._config.redis); const channel = `events:${traceId}`; @@ -1147,8 +1147,8 @@ function initializeEventRepo() { host: env.PUBSUB_REDIS_HOST, username: env.PUBSUB_REDIS_USERNAME, password: env.PUBSUB_REDIS_PASSWORD, - enableAutoPipelining: true, - ...(env.PUBSUB_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + tlsDisabled: env.PUBSUB_REDIS_TLS_DISABLED === "true", + clusterMode: env.PUBSUB_REDIS_CLUSTER_MODE_ENABLED === "1", }, }); diff --git a/apps/webapp/app/v3/marqs/devPubSub.server.ts b/apps/webapp/app/v3/marqs/devPubSub.server.ts index 7576c881b5..7673768cb8 100644 --- a/apps/webapp/app/v3/marqs/devPubSub.server.ts +++ b/apps/webapp/app/v3/marqs/devPubSub.server.ts @@ -25,8 +25,8 @@ function initializeDevPubSub() { host: env.PUBSUB_REDIS_HOST, username: env.PUBSUB_REDIS_USERNAME, password: env.PUBSUB_REDIS_PASSWORD, - enableAutoPipelining: true, - ...(env.PUBSUB_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + tlsDisabled: env.PUBSUB_REDIS_TLS_DISABLED === "true", + clusterMode: env.PUBSUB_REDIS_CLUSTER_MODE_ENABLED === "1", }, schema: messageCatalog, }); diff --git a/apps/webapp/app/v3/services/projectPubSub.server.ts b/apps/webapp/app/v3/services/projectPubSub.server.ts index 09a7e11607..b39b70e577 100644 --- a/apps/webapp/app/v3/services/projectPubSub.server.ts +++ b/apps/webapp/app/v3/services/projectPubSub.server.ts @@ -26,8 +26,8 @@ function initializeProjectPubSub() { host: env.PUBSUB_REDIS_HOST, username: env.PUBSUB_REDIS_USERNAME, password: env.PUBSUB_REDIS_PASSWORD, - enableAutoPipelining: true, - ...(env.PUBSUB_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + tlsDisabled: env.PUBSUB_REDIS_TLS_DISABLED === "true", + clusterMode: env.PUBSUB_REDIS_CLUSTER_MODE_ENABLED === "1", }, schema: messageCatalog, }); diff --git a/apps/webapp/app/v3/utils/zodPubSub.server.ts b/apps/webapp/app/v3/utils/zodPubSub.server.ts index 9680f89d36..e1af35ca28 100644 --- a/apps/webapp/app/v3/utils/zodPubSub.server.ts +++ b/apps/webapp/app/v3/utils/zodPubSub.server.ts @@ -1,13 +1,13 @@ import { Logger } from "@trigger.dev/core/logger"; import { ZodMessageCatalogSchema, ZodMessageHandler } from "@trigger.dev/core/v3/zodMessageHandler"; import { Evt } from "evt"; -import Redis, { RedisOptions } from "ioredis"; import { z } from "zod"; +import { createRedisClient, RedisClient, RedisWithClusterOptions } from "~/redis.server"; import { logger } from "~/services/logger.server"; import { safeJsonParse } from "~/utils/json"; export type ZodPubSubOptions = { - redis: RedisOptions; + redis: RedisWithClusterOptions; schema: TMessageCatalog; }; @@ -23,7 +23,7 @@ export interface ZodSubscriber class RedisZodSubscriber implements ZodSubscriber { - private _subscriber: Redis; + private _subscriber: RedisClient; private _listeners: Map Promise> = new Map(); private _messageHandler: ZodMessageHandler; @@ -36,7 +36,7 @@ class RedisZodSubscriber private readonly _options: ZodPubSubOptions, private readonly _logger: Logger ) { - this._subscriber = new Redis(_options.redis); + this._subscriber = createRedisClient("trigger:zodSubscriber", _options.redis); this._messageHandler = new ZodMessageHandler({ schema: _options.schema, logger: this._logger, @@ -104,7 +104,7 @@ class RedisZodSubscriber } export class ZodPubSub { - private _publisher: Redis; + private _publisher: RedisClient; private _logger = logger.child({ module: "ZodPubSub" }); private _subscriberCount = 0; @@ -113,7 +113,7 @@ export class ZodPubSub { } constructor(private _options: ZodPubSubOptions) { - this._publisher = new Redis(_options.redis); + this._publisher = createRedisClient("trigger:zodSubscriber", _options.redis); } public async publish( diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index fd9e0014af..003f0db71a 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -5,6 +5,12 @@ volumes: database-data-alt: pgadmin-data: redis-data: + redis-cluster_data-0: + redis-cluster_data-1: + redis-cluster_data-2: + redis-cluster_data-3: + redis-cluster_data-4: + redis-cluster_data-5: networks: app_network: @@ -35,24 +41,6 @@ services: - -c - shared_preload_libraries=pg_partman_bgw - pgadmin: - container_name: pgadmin - image: dpage/pgadmin4:8 - restart: always - environment: - PGADMIN_DEFAULT_EMAIL: admin@example.com - PGADMIN_DEFAULT_PASSWORD: admin - PGADMIN_DISABLE_POSTFIX: "true" - volumes: - - pgadmin-data:/var/lib/pgadmin - - ./pgadmin/servers.json:/pgadmin4/servers.json - networks: - - app_network - ports: - - 5480:80 - depends_on: - - database - redis: container_name: redis image: redis:7 @@ -64,6 +52,93 @@ services: ports: - 6379:6379 + # redis-node-0: + # image: docker.io/bitnami/redis-cluster:7.0 + # container_name: redis-node-0 + # networks: + # - app_network + # ports: + # - "6378:6379" + # volumes: + # - redis-cluster_data-0:/bitnami/redis/data + # environment: + # - "REDIS_PASSWORD=bitnami" + # - "REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5" + + # redis-node-1: + # image: docker.io/bitnami/redis-cluster:7.0 + # container_name: redis-node-1 + # networks: + # - app_network + # ports: + # - "6380:6379" + # volumes: + # - redis-cluster_data-1:/bitnami/redis/data + # environment: + # - "REDIS_PASSWORD=bitnami" + # - "REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5" + + # redis-node-2: + # image: docker.io/bitnami/redis-cluster:7.0 + # container_name: redis-node-2 + # networks: + # - app_network + # ports: + # - "6381:6379" + # volumes: + # - redis-cluster_data-2:/bitnami/redis/data + # environment: + # - "REDIS_PASSWORD=bitnami" + # - "REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5" + + # redis-node-3: + # image: docker.io/bitnami/redis-cluster:7.0 + # container_name: redis-node-3 + # networks: + # - app_network + # ports: + # - "6382:6379" + # volumes: + # - redis-cluster_data-3:/bitnami/redis/data + # environment: + # - "REDIS_PASSWORD=bitnami" + # - "REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5" + + # redis-node-4: + # image: docker.io/bitnami/redis-cluster:7.0 + # container_name: redis-node-4 + # networks: + # - app_network + # ports: + # - "6383:6379" + # volumes: + # - redis-cluster_data-4:/bitnami/redis/data + # environment: + # - "REDIS_PASSWORD=bitnami" + # - "REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5" + + # redis-node-5: + # image: docker.io/bitnami/redis-cluster:7.0 + # container_name: redis-node-5 + # networks: + # - app_network + # ports: + # - "6384:6379" + # volumes: + # - redis-cluster_data-5:/bitnami/redis/data + # depends_on: + # - redis-node-0 + # - redis-node-1 + # - redis-node-2 + # - redis-node-3 + # - redis-node-4 + # environment: + # - "REDIS_PASSWORD=bitnami" + # - "REDISCLI_AUTH=bitnami" + # - "REDIS_CLUSTER_REPLICAS=1" + # - "REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5" + # - "REDIS_CLUSTER_CREATOR=yes" + electric: image: electricsql/electric:1.0.0-beta.1@sha256:2262f6f09caf5fa45f233731af97b84999128170a9529e5f9b9b53642308493f restart: always