Skip to content

Commit bf43fd4

Browse files
authored
Use priority offsets in MarQS and schedule future messages using redis worker (#1720)
* Improve the MarQS priority system by moving future messages into the LRE worker and using a priority timestamp offset to define priority in messages * Add a test to make sure priority offsets don't unfairly favor environments * requeuing should clear concurrency sets * Heartbeats should only reschedule existing heartbeat jobs * Fix type error
1 parent 7186b1e commit bf43fd4

22 files changed

+788
-978
lines changed

apps/webapp/app/v3/legacyRunEngineWorker.server.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { singleton } from "~/utils/singleton";
77
import { TaskRunHeartbeatFailedService } from "./taskRunHeartbeatFailed.server";
88
import { completeBatchTaskRunItemV3 } from "./services/batchTriggerV3.server";
99
import { prisma } from "~/db.server";
10+
import { marqs } from "./marqs/index.server";
1011

1112
function initializeWorker() {
1213
const redisOptions = {
@@ -49,6 +50,15 @@ function initializeWorker() {
4950
maxAttempts: 10,
5051
},
5152
},
53+
scheduleRequeueMessage: {
54+
schema: z.object({
55+
messageId: z.string(),
56+
}),
57+
visibilityTimeoutMs: 60_000,
58+
retry: {
59+
maxAttempts: 5,
60+
},
61+
},
5262
},
5363
concurrency: {
5464
workers: env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_WORKERS,
@@ -74,6 +84,9 @@ function initializeWorker() {
7484
attempt
7585
);
7686
},
87+
scheduleRequeueMessage: async ({ payload }) => {
88+
await marqs.requeueMessageById(payload.messageId);
89+
},
7790
},
7891
});
7992

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
export const MARQS_RESUME_PRIORITY_TIMESTAMP_OFFSET = 31_556_952 * 1000; // 1 year
2+
export const MARQS_RETRY_PRIORITY_TIMESTAMP_OFFSET = 15_778_476 * 1000; // 6 months
3+
export const MARQS_DELAYED_REQUEUE_THRESHOLD_IN_MS = 500;
4+
export const MARQS_SCHEDULED_REQUEUE_AVAILABLE_AT_THRESHOLD_IN_MS = 500;

apps/webapp/app/v3/marqs/envPriorityDequeuingStrategy.server.ts

Lines changed: 0 additions & 95 deletions
This file was deleted.

0 commit comments

Comments
 (0)