diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index ae833240b5..2ba25e04ef 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -91,6 +91,93 @@ const EnvironmentSchema = z.object({ REDIS_PASSWORD: z.string().optional(), REDIS_TLS_DISABLED: z.string().optional(), + RATE_LIMIT_REDIS_HOST: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_HOST), + RATE_LIMIT_REDIS_READER_HOST: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_READER_HOST), + RATE_LIMIT_REDIS_READER_PORT: z.coerce + .number() + .optional() + .transform( + (v) => + v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined) + ), + RATE_LIMIT_REDIS_PORT: z.coerce + .number() + .optional() + .transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)), + RATE_LIMIT_REDIS_USERNAME: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_USERNAME), + RATE_LIMIT_REDIS_PASSWORD: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_PASSWORD), + RATE_LIMIT_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"), + + CACHE_REDIS_HOST: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_HOST), + CACHE_REDIS_READER_HOST: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_READER_HOST), + CACHE_REDIS_READER_PORT: z.coerce + .number() + .optional() + .transform( + (v) => + v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined) + ), + CACHE_REDIS_PORT: z.coerce + .number() + .optional() + .transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)), + CACHE_REDIS_USERNAME: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_USERNAME), + CACHE_REDIS_PASSWORD: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_PASSWORD), + CACHE_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"), + + PUBSUB_REDIS_HOST: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_HOST), + PUBSUB_REDIS_READER_HOST: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_READER_HOST), + PUBSUB_REDIS_READER_PORT: z.coerce + .number() + .optional() + .transform( + (v) => + v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined) + ), + PUBSUB_REDIS_PORT: z.coerce + .number() + .optional() + .transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)), + PUBSUB_REDIS_USERNAME: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_USERNAME), + PUBSUB_REDIS_PASSWORD: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_PASSWORD), + PUBSUB_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"), + DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT: z.coerce.number().int().default(10), DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT: z.coerce.number().int().default(10), DEFAULT_DEV_ENV_EXECUTION_ATTEMPTS: z.coerce.number().int().positive().default(1), diff --git a/apps/webapp/app/services/apiRateLimit.server.ts b/apps/webapp/app/services/apiRateLimit.server.ts index f07e7d74e0..108f75cc8a 100644 --- a/apps/webapp/app/services/apiRateLimit.server.ts +++ b/apps/webapp/app/services/apiRateLimit.server.ts @@ -5,12 +5,12 @@ import { Duration } from "./rateLimiter.server"; export const apiRateLimiter = authorizationRateLimitMiddleware({ redis: { - port: env.REDIS_PORT, - host: env.REDIS_HOST, - username: env.REDIS_USERNAME, - password: env.REDIS_PASSWORD, + port: env.RATE_LIMIT_REDIS_PORT, + host: env.RATE_LIMIT_REDIS_HOST, + username: env.RATE_LIMIT_REDIS_USERNAME, + password: env.RATE_LIMIT_REDIS_PASSWORD, enableAutoPipelining: true, - ...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + ...(env.RATE_LIMIT_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), }, keyPrefix: "api", defaultLimiter: { diff --git a/apps/webapp/app/services/authorizationRateLimitMiddleware.server.ts b/apps/webapp/app/services/authorizationRateLimitMiddleware.server.ts index b32f9ea1ae..6a6b6995d6 100644 --- a/apps/webapp/app/services/authorizationRateLimitMiddleware.server.ts +++ b/apps/webapp/app/services/authorizationRateLimitMiddleware.server.ts @@ -163,12 +163,12 @@ export function authorizationRateLimitMiddleware({ const redisClient = createRedisRateLimitClient( redis ?? { - port: env.REDIS_PORT, - host: env.REDIS_HOST, - username: env.REDIS_USERNAME, - password: env.REDIS_PASSWORD, + port: env.RATE_LIMIT_REDIS_PORT, + host: env.RATE_LIMIT_REDIS_HOST, + username: env.RATE_LIMIT_REDIS_USERNAME, + password: env.RATE_LIMIT_REDIS_PASSWORD, enableAutoPipelining: true, - ...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + ...(env.RATE_LIMIT_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), } ); diff --git a/apps/webapp/app/services/platform.v3.server.ts b/apps/webapp/app/services/platform.v3.server.ts index b9a348d6b6..804dd4c52d 100644 --- a/apps/webapp/app/services/platform.v3.server.ts +++ b/apps/webapp/app/services/platform.v3.server.ts @@ -39,12 +39,12 @@ function initializePlatformCache() { const redisCacheStore = new RedisCacheStore({ connection: { keyPrefix: "tr:cache:platform:v3", - port: env.REDIS_PORT, - host: env.REDIS_HOST, - username: env.REDIS_USERNAME, - password: env.REDIS_PASSWORD, + port: env.CACHE_REDIS_PORT, + host: env.CACHE_REDIS_HOST, + username: env.CACHE_REDIS_USERNAME, + password: env.CACHE_REDIS_PASSWORD, enableAutoPipelining: true, - ...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + ...(env.CACHE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), }, }); diff --git a/apps/webapp/app/services/rateLimiter.server.ts b/apps/webapp/app/services/rateLimiter.server.ts index f2a494d1c6..9fdb4da0de 100644 --- a/apps/webapp/app/services/rateLimiter.server.ts +++ b/apps/webapp/app/services/rateLimiter.server.ts @@ -28,12 +28,12 @@ export class RateLimiter { redisClient ?? createRedisRateLimitClient( redis ?? { - port: env.REDIS_PORT, - host: env.REDIS_HOST, - username: env.REDIS_USERNAME, - password: env.REDIS_PASSWORD, + port: env.RATE_LIMIT_REDIS_PORT, + host: env.RATE_LIMIT_REDIS_HOST, + username: env.RATE_LIMIT_REDIS_USERNAME, + password: env.RATE_LIMIT_REDIS_PASSWORD, enableAutoPipelining: true, - ...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + ...(env.RATE_LIMIT_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), } ), limiter, diff --git a/apps/webapp/app/services/realtimeClientGlobal.server.ts b/apps/webapp/app/services/realtimeClientGlobal.server.ts index 514b2fb415..106d22ac85 100644 --- a/apps/webapp/app/services/realtimeClientGlobal.server.ts +++ b/apps/webapp/app/services/realtimeClientGlobal.server.ts @@ -8,12 +8,12 @@ function initializeRealtimeClient() { electricOrigin: env.ELECTRIC_ORIGIN, keyPrefix: "tr:realtime:concurrency", redis: { - port: env.REDIS_PORT, - host: env.REDIS_HOST, - username: env.REDIS_USERNAME, - password: env.REDIS_PASSWORD, + port: env.RATE_LIMIT_REDIS_PORT, + host: env.RATE_LIMIT_REDIS_HOST, + username: env.RATE_LIMIT_REDIS_USERNAME, + password: env.RATE_LIMIT_REDIS_PASSWORD, enableAutoPipelining: true, - ...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + ...(env.RATE_LIMIT_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), }, cachedLimitProvider: { async getCachedLimit(organizationId, defaultValue) { diff --git a/apps/webapp/app/v3/eventRepository.server.ts b/apps/webapp/app/v3/eventRepository.server.ts index 22898e0009..f763f992e7 100644 --- a/apps/webapp/app/v3/eventRepository.server.ts +++ b/apps/webapp/app/v3/eventRepository.server.ts @@ -991,10 +991,10 @@ export class EventRepository { async subscribeToTrace(traceId: string) { const redis = new Redis(this._config.redis); - const channel = `events:${traceId}:*`; + const channel = `events:${traceId}`; // Subscribe to the channel. - await redis.psubscribe(channel); + await redis.subscribe(channel); // Increment the subscriber count. this._subscriberCount++; @@ -1002,15 +1002,13 @@ export class EventRepository { const eventEmitter = new EventEmitter(); // Define the message handler. - redis.on("pmessage", (pattern, channelReceived, message) => { - if (channelReceived.startsWith(`events:${traceId}:`)) { - eventEmitter.emit("message", message); - } + redis.on("message", (_, message) => { + eventEmitter.emit("message", message); }); // Return a function that can be used to unsubscribe. const unsubscribe = async () => { - await redis.punsubscribe(channel); + await redis.unsubscribe(channel); redis.quit(); this._subscriberCount--; }; @@ -1101,10 +1099,13 @@ export class EventRepository { async #publishToRedis(events: CreatableEvent[]) { if (events.length === 0) return; - const uniqueTraceSpans = new Set(events.map((e) => `events:${e.traceId}:${e.spanId}`)); - for (const id of uniqueTraceSpans) { - await this._redisPublishClient.publish(id, new Date().toISOString()); - } + const uniqueTraces = new Set(events.map((e) => `events:${e.traceId}`)); + + await Promise.allSettled( + Array.from(uniqueTraces).map((traceId) => + this._redisPublishClient.publish(traceId, new Date().toISOString()) + ) + ); } public generateTraceId() { @@ -1142,12 +1143,12 @@ function initializeEventRepo() { batchInterval: env.EVENTS_BATCH_INTERVAL, retentionInDays: env.EVENTS_DEFAULT_LOG_RETENTION, redis: { - port: env.REDIS_PORT, - host: env.REDIS_HOST, - username: env.REDIS_USERNAME, - password: env.REDIS_PASSWORD, + port: env.PUBSUB_REDIS_PORT, + host: env.PUBSUB_REDIS_HOST, + username: env.PUBSUB_REDIS_USERNAME, + password: env.PUBSUB_REDIS_PASSWORD, enableAutoPipelining: true, - ...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + ...(env.PUBSUB_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), }, }); diff --git a/apps/webapp/app/v3/marqs/devPubSub.server.ts b/apps/webapp/app/v3/marqs/devPubSub.server.ts index 974500b1a7..7576c881b5 100644 --- a/apps/webapp/app/v3/marqs/devPubSub.server.ts +++ b/apps/webapp/app/v3/marqs/devPubSub.server.ts @@ -21,12 +21,12 @@ export const devPubSub = singleton("devPubSub", initializeDevPubSub); function initializeDevPubSub() { const pubSub = new ZodPubSub({ redis: { - port: env.REDIS_PORT, - host: env.REDIS_HOST, - username: env.REDIS_USERNAME, - password: env.REDIS_PASSWORD, + port: env.PUBSUB_REDIS_PORT, + host: env.PUBSUB_REDIS_HOST, + username: env.PUBSUB_REDIS_USERNAME, + password: env.PUBSUB_REDIS_PASSWORD, enableAutoPipelining: true, - ...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + ...(env.PUBSUB_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), }, schema: messageCatalog, }); diff --git a/apps/webapp/app/v3/marqs/fairDequeuingStrategy.server.ts b/apps/webapp/app/v3/marqs/fairDequeuingStrategy.server.ts index 5b0769e50f..6d971ce3c1 100644 --- a/apps/webapp/app/v3/marqs/fairDequeuingStrategy.server.ts +++ b/apps/webapp/app/v3/marqs/fairDequeuingStrategy.server.ts @@ -36,7 +36,6 @@ export type FairDequeuingStrategyOptions = { defaultOrgConcurrency: number; defaultEnvConcurrency: number; parentQueueLimit: number; - checkForDisabledOrgs: boolean; tracer: Tracer; seed?: string; /** @@ -88,7 +87,6 @@ const defaultBiases: FairDequeuingStrategyBiases = { export class FairDequeuingStrategy implements MarQSFairDequeueStrategy { private _cache: UnkeyCache<{ concurrencyLimit: number; - disabledConcurrency: boolean; }>; private _rng: seedrandom.PRNG; @@ -107,11 +105,6 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy { fresh: 60_000, // The time in milliseconds that a value is considered fresh. Cache hits within this time will return the cached value. stale: 180_000, // The time in milliseconds that a value is considered stale. Cache hits within this time will return the cached value and trigger a background refresh. }), - disabledConcurrency: new Namespace(ctx, { - stores: [memory], - fresh: 30_000, // The time in milliseconds that a value is considered fresh. Cache hits within this time will return the cached value. - stale: 180_000, // The time in milliseconds that a value is considered stale. Cache hits within this time will return the cached value and trigger a background refresh. - }), }); this._rng = seedrandom(options.seed); @@ -512,16 +505,6 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy { return await startSpan(this.options.tracer, "getOrgConcurrency", async (span) => { span.setAttribute("org_id", orgId); - if (this.options.checkForDisabledOrgs) { - const isDisabled = await this.#getConcurrencyDisabled(orgId); - - if (isDisabled) { - span.setAttribute("disabled", true); - - return { current: 0, limit: 0 }; - } - } - const [currentValue, limitValue] = await Promise.all([ this.#getOrgCurrentConcurrency(orgId), this.#getOrgConcurrencyLimit(orgId), @@ -587,22 +570,6 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy { }); } - async #getConcurrencyDisabled(orgId: string) { - return await startSpan(this.options.tracer, "getConcurrencyDisabled", async (span) => { - span.setAttribute("org_id", orgId); - - const key = this.options.keys.disabledConcurrencyLimitKey(orgId); - - const result = await this._cache.disabledConcurrency.swr(key, async () => { - const value = await this.options.redis.exists(key); - - return Boolean(value); - }); - - return typeof result.val === "boolean" ? result.val : false; - }); - } - async #getOrgConcurrencyLimit(orgId: string) { return await startSpan(this.options.tracer, "getOrgConcurrencyLimit", async (span) => { span.setAttribute("org_id", orgId); diff --git a/apps/webapp/app/v3/marqs/index.server.ts b/apps/webapp/app/v3/marqs/index.server.ts index 7b151f9692..a6d60bab8e 100644 --- a/apps/webapp/app/v3/marqs/index.server.ts +++ b/apps/webapp/app/v3/marqs/index.server.ts @@ -1619,7 +1619,6 @@ function getMarQSClient() { keys: keysProducer, defaultEnvConcurrency: env.DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT, defaultOrgConcurrency: env.DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT, - checkForDisabledOrgs: true, biases: { concurrencyLimitBias: env.MARQS_CONCURRENCY_LIMIT_BIAS, availableCapacityBias: env.MARQS_AVAILABLE_CAPACITY_BIAS, @@ -1635,7 +1634,6 @@ function getMarQSClient() { keys: keysProducer, defaultEnvConcurrency: env.DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT, defaultOrgConcurrency: env.DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT, - checkForDisabledOrgs: false, biases: { concurrencyLimitBias: 0.0, availableCapacityBias: 0.0, diff --git a/apps/webapp/app/v3/marqs/v2.server.ts b/apps/webapp/app/v3/marqs/v2.server.ts index ba637715ba..fee98846b4 100644 --- a/apps/webapp/app/v3/marqs/v2.server.ts +++ b/apps/webapp/app/v3/marqs/v2.server.ts @@ -81,7 +81,6 @@ function getMarQSClient() { keys: new MarQSV2KeyProducer(KEY_PREFIX), defaultEnvConcurrency: env.V2_MARQS_DEFAULT_ENV_CONCURRENCY, defaultOrgConcurrency: env.DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT, - checkForDisabledOrgs: true, }), envQueuePriorityStrategy: new NoopFairDequeuingStrategy(), // We don't use this in v2, since all queues go through the shared queue workers: 0, diff --git a/apps/webapp/app/v3/services/projectPubSub.server.ts b/apps/webapp/app/v3/services/projectPubSub.server.ts index 7c2f7c6c05..09a7e11607 100644 --- a/apps/webapp/app/v3/services/projectPubSub.server.ts +++ b/apps/webapp/app/v3/services/projectPubSub.server.ts @@ -22,12 +22,12 @@ export const projectPubSub = singleton("projectPubSub", initializeProjectPubSub) function initializeProjectPubSub() { const pubSub = new ZodPubSub({ redis: { - port: env.REDIS_PORT, - host: env.REDIS_HOST, - username: env.REDIS_USERNAME, - password: env.REDIS_PASSWORD, + port: env.PUBSUB_REDIS_PORT, + host: env.PUBSUB_REDIS_HOST, + username: env.PUBSUB_REDIS_USERNAME, + password: env.PUBSUB_REDIS_PASSWORD, enableAutoPipelining: true, - ...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + ...(env.PUBSUB_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), }, schema: messageCatalog, }); diff --git a/apps/webapp/test/fairDequeuingStrategy.test.ts b/apps/webapp/test/fairDequeuingStrategy.test.ts index df851377bc..fa96d11bc4 100644 --- a/apps/webapp/test/fairDequeuingStrategy.test.ts +++ b/apps/webapp/test/fairDequeuingStrategy.test.ts @@ -23,7 +23,6 @@ describe("FairDequeuingStrategy", () => { defaultOrgConcurrency: 10, defaultEnvConcurrency: 5, parentQueueLimit: 100, - checkForDisabledOrgs: true, seed: "test-seed-1", // for deterministic shuffling }); @@ -53,7 +52,6 @@ describe("FairDequeuingStrategy", () => { defaultOrgConcurrency: 2, defaultEnvConcurrency: 5, parentQueueLimit: 100, - checkForDisabledOrgs: true, seed: "test-seed-2", }); @@ -89,7 +87,6 @@ describe("FairDequeuingStrategy", () => { defaultOrgConcurrency: 10, defaultEnvConcurrency: 2, parentQueueLimit: 100, - checkForDisabledOrgs: true, seed: "test-seed-3", }); @@ -114,40 +111,6 @@ describe("FairDequeuingStrategy", () => { expect(result).toHaveLength(0); }); - redisTest("should handle disabled orgs", async ({ redis }) => { - const keyProducer = createKeyProducer("test"); - const strategy = new FairDequeuingStrategy({ - tracer, - redis, - keys: keyProducer, - defaultOrgConcurrency: 10, - defaultEnvConcurrency: 5, - parentQueueLimit: 100, - checkForDisabledOrgs: true, - seed: "test-seed-4", - }); - - await setupQueue({ - redis, - keyProducer, - parentQueue: "parent-queue", - score: Date.now() - 1000, - queueId: "queue-1", - orgId: "org-1", - envId: "env-1", - }); - - await setupConcurrency({ - redis, - keyProducer, - org: { id: "org-1", currentConcurrency: 0, isDisabled: true }, - env: { id: "env-1", currentConcurrency: 0 }, - }); - - const result = await strategy.distributeFairQueuesFromParentQueue("parent-queue", "consumer-1"); - expect(result).toHaveLength(0); - }); - redisTest("should respect parentQueueLimit", async ({ redis }) => { const keyProducer = createKeyProducer("test"); const strategy = new FairDequeuingStrategy({ @@ -157,7 +120,6 @@ describe("FairDequeuingStrategy", () => { defaultOrgConcurrency: 10, defaultEnvConcurrency: 5, parentQueueLimit: 2, // Only take 2 queues - checkForDisabledOrgs: true, seed: "test-seed-6", }); @@ -212,7 +174,6 @@ describe("FairDequeuingStrategy", () => { defaultOrgConcurrency: 10, defaultEnvConcurrency: 5, parentQueueLimit: 10, - checkForDisabledOrgs: true, seed: "test-seed-reuse-1", reuseSnapshotCount: 1, }); @@ -302,7 +263,6 @@ describe("FairDequeuingStrategy", () => { defaultOrgConcurrency: 10, defaultEnvConcurrency: 5, parentQueueLimit: 100, - checkForDisabledOrgs: true, seed: "test-seed-5", }); @@ -460,7 +420,6 @@ describe("FairDequeuingStrategy", () => { defaultOrgConcurrency: 10, defaultEnvConcurrency: 5, parentQueueLimit: 100, - checkForDisabledOrgs: true, seed: "fixed-seed", }); @@ -620,7 +579,6 @@ describe("FairDequeuingStrategy", () => { defaultOrgConcurrency: 10, defaultEnvConcurrency: 5, parentQueueLimit: 100, - checkForDisabledOrgs: true, seed: `test-seed-${i}`, biases: { concurrencyLimitBias: 0.8, @@ -705,7 +663,6 @@ describe("FairDequeuingStrategy", () => { defaultOrgConcurrency: 10, defaultEnvConcurrency: 5, parentQueueLimit: 100, - checkForDisabledOrgs: true, seed: "fixed-seed", biases: { concurrencyLimitBias: 0, @@ -791,7 +748,6 @@ describe("FairDequeuingStrategy", () => { defaultOrgConcurrency: 10, defaultEnvConcurrency: 5, parentQueueLimit: 100, - checkForDisabledOrgs: true, seed: "test-seed-max-orgs", maximumOrgCount: 2, // Only select top 2 orgs }); diff --git a/references/v3-catalog/src/trigger/queues.ts b/references/v3-catalog/src/trigger/queues.ts index ec97993ab6..c6b2841ee9 100644 --- a/references/v3-catalog/src/trigger/queues.ts +++ b/references/v3-catalog/src/trigger/queues.ts @@ -1,4 +1,4 @@ -import { logger, task, wait } from "@trigger.dev/sdk/v3"; +import { logger, runs, task, wait } from "@trigger.dev/sdk/v3"; export const queuesController = task({ id: "queues/controller", @@ -40,3 +40,20 @@ export const namedQueueTask = task({ logger.info("named-queue"); }, }); + +export const inspectApiTraffic = task({ + id: "queues/inspect-api-traffic", + run: async (payload: unknown, { ctx }) => { + // Retrieve the run 100 times + for (let i = 0; i < 100; i++) { + await runs.retrieve(ctx.run.id); + } + + const response = await runs.retrieve(ctx.run.id).asResponse(); + + // Log out the headers + const headers = Object.fromEntries(response.headers.entries()); + + logger.info("Headers", { headers }); + }, +});