Skip to content

Commit fa01292

Browse files
committed
fix(query-orchestrator): QueryQueue - improve performance for high concurrency setups
# Conflicts: # packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js
1 parent dcdfce1 commit fa01292

File tree

2 files changed

+37
-9
lines changed

2 files changed

+37
-9
lines changed

packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,12 +136,31 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
136136
}
137137

138138
public async getActiveAndToProcess(): Promise<GetActiveAndToProcessResponse> {
139+
const active: QueryKeysTuple[] = [];
140+
const toProcess: QueryKeysTuple[] = [];
141+
142+
const rows = await this.driver.query<CubeStoreListResponse>('QUEUE LIST ?', [
143+
this.options.redisQueuePrefix
144+
]);
145+
if (rows.length) {
146+
for (const row of rows) {
147+
if (row.status === 'active') {
148+
active.push([
149+
row.id as QueryKeyHash,
150+
row.queue_id ? parseInt(row.queue_id, 10) : null,
151+
]);
152+
} else {
153+
toProcess.push([
154+
row.id as QueryKeyHash,
155+
row.queue_id ? parseInt(row.queue_id, 10) : null,
156+
]);
157+
}
158+
}
159+
}
160+
139161
return [
140-
// We don't return active queries, because it's useless
141-
// There is only one place where it's used, and it's QueryQueue.reconcileQueueImpl
142-
// Cube Store provides strict guarantees that queue item cannot be active & pending in the same time
143-
[],
144-
await this.getToProcessQueries()
162+
active,
163+
toProcess,
145164
];
146165
}
147166

packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -564,7 +564,16 @@ export class QueryQueue {
564564
}
565565
}));
566566

567-
const [_active, toProcess] = await queueConnection.getActiveAndToProcess();
567+
const [active, toProcess] = await queueConnection.getActiveAndToProcess();
568+
569+
/**
570+
* Important notice: Concurrency configuration works per a specific queue, not per node.
571+
*
572+
* In production clusters where it contains N nodes, it shares the same concurrency. It leads to a point
573+
* where every node tries to pick up jobs as much as concurrency is defined for the whole cluster. To minimize
574+
* the effect of competition between nodes, it's important to reduce the number of tries to process by active jobs.
575+
*/
576+
const toProcessLimit = active.length >= this.concurrency ? 1 : this.concurrency - active.length;
568577

569578
await Promise.all(
570579
R.pipe(
@@ -581,7 +590,7 @@ export class QueryQueue {
581590
return false;
582591
}
583592
}),
584-
R.take(this.concurrency),
593+
R.take(toProcessLimit),
585594
R.map((([queryKey, queueId]) => this.sendProcessMessageFn(queryKey, queueId)))
586595
)(toProcess)
587596
);
@@ -740,8 +749,8 @@ export class QueryQueue {
740749
}
741750

742751
/**
743-
* Processing query specified by the `queryKey`. This method encapsulate most
744-
* of the logic related with the queues updates, heartbeat, etc.
752+
* Processing query specified by the `queryKey`. This method encapsulates most
753+
* of the logic related to the queue updates, heartbeat, etc.
745754
*
746755
* @param {QueryKeyHash} queryKeyHashed
747756
* @param {QueueId | null} queueId Supported by new Cube Store and Memory

0 commit comments

Comments
 (0)