Skip to content

Commit 0e5ec8b

Browse files
authored
Only allow a single dev queue consumer to dequeue at once (#1737)
* Only allow a single dev queue consumer to dequeue at once * Expire dev queue consumer connections keys * Add missing return
1 parent d2ceb82 commit 0e5ec8b

File tree

3 files changed

+48
-1
lines changed

3 files changed

+48
-1
lines changed

apps/webapp/app/v3/authenticatedSocketConnection.server.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,10 @@ export class AuthenticatedSocketConnection {
4343
});
4444
});
4545
},
46+
canSendMessage: () => ws.readyState === WebSocket.OPEN,
4647
});
4748

48-
this._consumer = new DevQueueConsumer(authenticatedEnv, this._sender, {
49+
this._consumer = new DevQueueConsumer(this.id, authenticatedEnv, this._sender, {
4950
ipAddress: Array.isArray(this.ipAddress) ? this.ipAddress.join(", ") : this.ipAddress,
5051
});
5152

apps/webapp/app/v3/marqs/devQueueConsumer.server.ts

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import { attributesFromAuthenticatedEnv, tracer } from "../tracer.server";
2222
import { getMaxDuration } from "../utils/maxDuration";
2323
import { DevSubscriber, devPubSub } from "./devPubSub.server";
2424
import { findQueueInEnvironment, sanitizeQueueName } from "~/models/taskQueue.server";
25+
import { createRedisClient, RedisClient } from "~/redis.server";
2526

2627
const MessageBody = z.discriminatedUnion("type", [
2728
z.object({
@@ -53,14 +54,21 @@ export class DevQueueConsumer {
5354
private _currentSpan: Span | undefined;
5455
private _endSpanInNextIteration = false;
5556
private _inProgressRuns: Map<string, string> = new Map(); // Keys are task run friendly IDs, values are TaskRun internal ids/queue message ids
57+
private _connectionLostAt?: Date;
58+
private _redisClient: RedisClient;
5659

5760
constructor(
61+
public id: string,
5862
public env: AuthenticatedEnvironment,
5963
private _sender: ZodMessageSender<typeof serverWebsocketMessages>,
6064
private _options: DevQueueConsumerOptions = {}
6165
) {
6266
this._traceTimeoutSeconds = _options.traceTimeoutSeconds ?? 60;
6367
this._maximumItemsPerTrace = _options.maximumItemsPerTrace ?? 1_000;
68+
this._redisClient = createRedisClient("tr:devQueueConsumer", {
69+
keyPrefix: "tr:devQueueConsumer:",
70+
...devPubSub.redisOptions,
71+
});
6472
}
6573

6674
// This method is called when a background worker is deprecated and will no longer be used unless a run is locked to it
@@ -235,6 +243,8 @@ export class DevQueueConsumer {
235243
return;
236244
}
237245

246+
await this._redisClient.set(`connection:${this.env.id}`, this.id, "EX", 60 * 60 * 24); // 24 hours
247+
238248
this._enabled = true;
239249
// Create the session
240250
await createNewSession(this.env, this._options.ipAddress ?? "unknown");
@@ -252,6 +262,38 @@ export class DevQueueConsumer {
252262
return;
253263
}
254264

265+
const canSendMessage = await this._sender.validateCanSendMessage();
266+
267+
if (!canSendMessage) {
268+
this._connectionLostAt ??= new Date();
269+
270+
if (Date.now() - this._connectionLostAt.getTime() > 60 * 1000) {
271+
logger.debug("Connection lost for more than 60 seconds, stopping the consumer", {
272+
env: this.env,
273+
});
274+
275+
await this.stop("Connection lost for more than 60 seconds");
276+
return;
277+
}
278+
279+
setTimeout(() => this.#doWork(), 1000);
280+
return;
281+
}
282+
283+
this._connectionLostAt = undefined;
284+
285+
const currentConnection = await this._redisClient.get(`connection:${this.env.id}`);
286+
287+
if (currentConnection && currentConnection !== this.id) {
288+
logger.debug("Another connection is active, stopping the consumer", {
289+
currentConnection,
290+
env: this.env,
291+
});
292+
293+
await this.stop("Another connection is active");
294+
return;
295+
}
296+
255297
// Check if the trace has expired
256298
if (
257299
this._perTraceCountdown === 0 ||

apps/webapp/app/v3/utils/zodPubSub.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,10 @@ export class ZodPubSub<TMessageCatalog extends ZodMessageCatalogSchema> {
116116
this._publisher = createRedisClient("trigger:zodSubscriber", _options.redis);
117117
}
118118

119+
get redisOptions() {
120+
return this._options.redis;
121+
}
122+
119123
public async publish<K extends keyof TMessageCatalog>(
120124
channel: string,
121125
type: K,

0 commit comments

Comments
 (0)