Skip to content

Commit c388350

Browse files
authored
fix(query-orchestrator): QueryQueue - update heartbeat by queue id (#9817)
Cube Store allows updating the heartbeat by path or queue id. Using queue id provides better performance and fixes possible concurrency issues, such as slippage
1 parent bfec376 commit c388350

File tree

2 files changed

+8
-8
lines changed

2 files changed

+8
-8
lines changed

packages/cubejs-base-driver/src/queue-driver.interface.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
export type QueryDef = unknown;
22
// Primary key of Queue item
33
export type QueueId = string | number | bigint;
4-
// This was used as lock for Redis, deprecated.
5-
export type ProcessingId = string | number;
4+
// This was used as a lock for Redis, deprecated.
5+
export type ProcessingId = string | number | bigint;
66
export type QueryKey = (string | [string, any[]]) & {
77
persistent?: true,
88
};
@@ -64,25 +64,25 @@ export interface QueueDriverConnectionInterface {
6464
* @param keyScore Redis specific thing
6565
* @param queryKey
6666
* @param orphanedTime
67-
* @param queryHandler Our queue allow to use different handlers. For example query, cvsQuery, etc.
67+
* @param queryHandler Our queue allows using different handlers. For example, query, cvsQuery, etc.
6868
* @param query
6969
* @param priority
7070
* @param options
7171
*/
7272
addToQueue(keyScore: number, queryKey: QueryKey, orphanedTime: number, queryHandler: string, query: AddToQueueQuery, priority: number, options: AddToQueueOptions): Promise<AddToQueueResponse>;
73-
// Return query keys which was sorted by priority and time
73+
// Return query keys that were sorted by priority and time
7474
getToProcessQueries(): Promise<QueryKeysTuple[]>;
7575
getActiveQueries(): Promise<QueryKeysTuple[]>;
7676
getQueryDef(hash: QueryKeyHash, queueId: QueueId | null): Promise<QueryDef | null>;
77-
// Queries which was added to queue, but was not processed and not needed
77+
// Queries that were added to queue, but was not processed and not needed
7878
getOrphanedQueries(): Promise<QueryKeysTuple[]>;
79-
// Queries which was not completed with old heartbeat
79+
// Queries that were not completed with old heartbeat
8080
getStalledQueries(): Promise<QueryKeysTuple[]>;
8181
getQueryStageState(onlyKeys: boolean): Promise<QueryStageStateResponse>;
8282
updateHeartBeat(hash: QueryKeyHash, queueId: QueueId | null): Promise<void>;
8383
getNextProcessingId(): Promise<ProcessingId>;
8484
// Trying to acquire a lock for processing a queue item, this method can return null when
85-
// multiple nodes tries to process the same query
85+
// multiple nodes try to process the same query
8686
retrieveForProcessing(hash: QueryKeyHash, processingId: ProcessingId): Promise<RetrieveForProcessingResponse>;
8787
freeProcessingLock(hash: QueryKeyHash, processingId: ProcessingId, activated: unknown): Promise<void>;
8888
optimisticQueryUpdate(hash: QueryKeyHash, toUpdate: unknown, processingId: ProcessingId, queueId: QueueId | null): Promise<boolean>;

packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -807,7 +807,7 @@ export class QueryQueue {
807807
queryProcessHeartbeat = Date.now();
808808
}
809809

810-
return queueConnection.updateHeartBeat(queryKeyHashed);
810+
return queueConnection.updateHeartBeat(queryKeyHashed, queueId);
811811
},
812812
this.heartBeatInterval * 1000
813813
);

0 commit comments

Comments
 (0)