Skip to content

Commit 2d66576

Browse files
authored
refactor(query-orchestrator): LocalQueue driver - correct handling for pending/active (#9706)
I did a refactor for the `LocalQueue` driver. Previously, while the `LocalQueue` driver processed the queue item, it stored it in the pending and active states. Both states at the same time. This approach is not aligned with any queue servers. Cube Store has a strict obligation that a queue item can be in only one state at a time. Let's align it to simplify upcoming changes in #9705.
1 parent 9b8e95a commit 2d66576

File tree

3 files changed

+32
-31
lines changed

3 files changed

+32
-31
lines changed

packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,8 +174,6 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
174174
toProcess.push(row.id as string);
175175
} else if (row.status === 'active') {
176176
active.push(row.id as string);
177-
// TODO: getQueryStage is broken for Executing query stage...
178-
toProcess.push(row.id as string);
179177
}
180178
}
181179

packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueDriver.js

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,12 +152,14 @@ export class LocalQueueDriverConnection {
152152
}
153153

154154
let added = 0;
155-
if (!this.toProcess[key]) {
155+
156+
if (!this.toProcess[key] && !this.active[key]) {
156157
this.toProcess[key] = {
157158
order: keyScore,
158159
queueId: options.queueId,
159160
key
160161
};
162+
161163
added = 1;
162164
}
163165

@@ -291,10 +293,14 @@ export class LocalQueueDriverConnection {
291293
}
292294

293295
let added = 0;
296+
294297
if (Object.keys(this.active).length < this.concurrency && !this.active[key]) {
295-
this.active[key] = { key, order: processingId };
298+
this.active[key] = { key, order: processingId, queueId: processingId };
299+
delete this.toProcess[key];
300+
296301
added = 1;
297302
}
303+
298304
this.heartBeat[key] = { key, order: new Date().getTime() };
299305

300306
if (this.getQueueEventsBus) {

packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js

Lines changed: 24 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
11
import R from 'ramda';
22
import { EventEmitter } from 'events';
33
import { getEnv, getProcessUid } from '@cubejs-backend/shared';
4-
import { QueueDriverInterface, QueryKey, QueryKeyHash, QueueId, QueryDef } from '@cubejs-backend/base-driver';
4+
import {
5+
QueueDriverInterface,
6+
QueryKey,
7+
QueryKeyHash,
8+
QueueId,
9+
QueryDef,
10+
QueryStageStateResponse
11+
} from '@cubejs-backend/base-driver';
512
import { CubeStoreQueueDriver } from '@cubejs-backend/cubestore-driver';
613

714
import { TimeoutError } from './TimeoutError';
@@ -557,31 +564,20 @@ export class QueryQueue {
557564
}
558565
}));
559566

560-
/**
561-
* There is a bug somewhere in Redis (maybe in memory too?),
562-
* which doesn't remove queue item from pending, while it's in active state
563-
*
564-
* TODO(ovr): Check LocalQueueDriver for strict guarantees that item cannot be in active & pending in the same time
565-
* TODO(ovr): Migrate to getToProcessQueries after removal of Redis
566-
*/
567-
const [active, toProcess] = await queueConnection.getActiveAndToProcess();
567+
const [_active, toProcess] = await queueConnection.getActiveAndToProcess();
568568

569569
await Promise.all(
570570
R.pipe(
571571
R.filter(([queryKey, _queueId]) => {
572-
if (active.findIndex(([p, _a]) => p === queryKey) === -1) {
573-
const subKeys = queryKey.split('@');
574-
if (subKeys.length === 1) {
575-
// common queries
576-
return true;
577-
} else if (subKeys[1] === this.processUid) {
578-
// current process persistent queries
579-
return true;
580-
} else {
581-
// other processes persistent queries
582-
return false;
583-
}
572+
const subKeys = queryKey.split('@');
573+
if (subKeys.length === 1) {
574+
// common queries
575+
return true;
576+
} else if (subKeys[1] === this.processUid) {
577+
// current process persistent queries
578+
return true;
584579
} else {
580+
// other processes persistent queries
585581
return false;
586582
}
587583
}),
@@ -627,7 +623,7 @@ export class QueryQueue {
627623
* Returns the list of queries planned to be processed and the list of active
628624
* queries.
629625
*
630-
* @returns {Array}
626+
* @returns {Promise<QueryStageStateResponse>}
631627
*/
632628
async fetchQueryStageState() {
633629
const queueConnection = await this.queueDriver.createConnection();
@@ -644,26 +640,27 @@ export class QueryQueue {
644640
*
645641
* @param {*} stageQueryKey
646642
* @param {number=} priorityFilter
647-
* @param {Array=} queryStageState
643+
* @param {QueryStageStateResponse=} queryStageState
648644
* @returns {Promise<undefined> | Promise<{ stage: string, timeElapsed: number }>}
649645
*/
650646
async getQueryStage(stageQueryKey, priorityFilter, queryStageState) {
651647
const [active, toProcess, allQueryDefs] = queryStageState || await this.fetchQueryStageState();
652648

653-
const queryDefs = toProcess.map(k => allQueryDefs[k]).filter(q => !!q);
654-
const queryInQueue = queryDefs.find(
649+
const queryInQueue = Object.values(allQueryDefs).find(
655650
q => this.redisHash(q.stageQueryKey) === this.redisHash(stageQueryKey) &&
656651
(priorityFilter != null ? q.priority === priorityFilter : true)
657652
);
658-
659653
if (queryInQueue) {
660654
if (active.indexOf(this.redisHash(queryInQueue.queryKey)) !== -1) {
661655
return {
662656
stage: 'Executing query',
663657
timeElapsed: queryInQueue.startQueryTime ? new Date().getTime() - queryInQueue.startQueryTime : undefined
664658
};
665659
}
666-
const index = queryDefs.filter(q => active.indexOf(this.redisHash(q.queryKey)) === -1).indexOf(queryInQueue);
660+
661+
const index = toProcess
662+
.filter((queryKey) => (priorityFilter != null ? allQueryDefs[queryKey]?.priority === priorityFilter : true))
663+
.indexOf(this.redisHash(queryInQueue.queryKey));
667664
if (index !== -1) {
668665
return index !== -1 ? { stage: `#${index + 1} in queue` } : undefined;
669666
}

0 commit comments

Comments
 (0)