Skip to content

Commit d14e5c6

Browse files
committed
refactor(query-orchestrator): Local queue driver - handle processing keys correctly
1 parent e17baa0 commit d14e5c6

File tree

2 files changed

+27
-28
lines changed

2 files changed

+27
-28
lines changed

packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueDriver.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,10 +291,14 @@ export class LocalQueueDriverConnection {
291291
}
292292

293293
let added = 0;
294+
294295
if (Object.keys(this.active).length < this.concurrency && !this.active[key]) {
295-
this.active[key] = { key, order: processingId };
296+
this.active[key] = { key, order: processingId, queueId: processingId };
297+
delete this.toProcess[key];
298+
296299
added = 1;
297300
}
301+
298302
this.heartBeat[key] = { key, order: new Date().getTime() };
299303

300304
if (this.getQueueEventsBus) {

packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js

Lines changed: 22 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
11
import R from 'ramda';
22
import { EventEmitter } from 'events';
33
import { getEnv, getProcessUid } from '@cubejs-backend/shared';
4-
import { QueueDriverInterface, QueryKey, QueryKeyHash, QueueId, QueryDef } from '@cubejs-backend/base-driver';
4+
import {
5+
QueueDriverInterface,
6+
QueryKey,
7+
QueryKeyHash,
8+
QueueId,
9+
QueryDef,
10+
QueryStageStateResponse
11+
} from '@cubejs-backend/base-driver';
512
import { CubeStoreQueueDriver } from '@cubejs-backend/cubestore-driver';
613

714
import { TimeoutError } from './TimeoutError';
@@ -557,31 +564,20 @@ export class QueryQueue {
557564
}
558565
}));
559566

560-
/**
561-
* There is a bug somewhere in Redis (maybe in memory too?),
562-
* which doesn't remove queue item from pending, while it's in active state
563-
*
564-
* TODO(ovr): Check LocalQueueDriver for strict guarantees that item cannot be in active & pending in the same time
565-
* TODO(ovr): Migrate to getToProcessQueries after removal of Redis
566-
*/
567-
const [active, toProcess] = await queueConnection.getActiveAndToProcess();
567+
const [_active, toProcess] = await queueConnection.getActiveAndToProcess();
568568

569569
await Promise.all(
570570
R.pipe(
571571
R.filter(([queryKey, _queueId]) => {
572-
if (active.findIndex(([p, _a]) => p === queryKey) === -1) {
573-
const subKeys = queryKey.split('@');
574-
if (subKeys.length === 1) {
575-
// common queries
576-
return true;
577-
} else if (subKeys[1] === this.processUid) {
578-
// current process persistent queries
579-
return true;
580-
} else {
581-
// other processes persistent queries
582-
return false;
583-
}
572+
const subKeys = queryKey.split('@');
573+
if (subKeys.length === 1) {
574+
// common queries
575+
return true;
576+
} else if (subKeys[1] === this.processUid) {
577+
// current process persistent queries
578+
return true;
584579
} else {
580+
// other processes persistent queries
585581
return false;
586582
}
587583
}),
@@ -627,7 +623,7 @@ export class QueryQueue {
627623
* Returns the list of queries planned to be processed and the list of active
628624
* queries.
629625
*
630-
* @returns {Array}
626+
* @returns {Promise<QueryStageStateResponse>}
631627
*/
632628
async fetchQueryStageState() {
633629
const queueConnection = await this.queueDriver.createConnection();
@@ -644,26 +640,25 @@ export class QueryQueue {
644640
*
645641
* @param {*} stageQueryKey
646642
* @param {number=} priorityFilter
647-
* @param {Array=} queryStageState
643+
* @param {QueryStageStateResponse=} queryStageState
648644
* @returns {Promise<undefined> | Promise<{ stage: string, timeElapsed: number }>}
649645
*/
650646
async getQueryStage(stageQueryKey, priorityFilter, queryStageState) {
651647
const [active, toProcess, allQueryDefs] = queryStageState || await this.fetchQueryStageState();
652648

653-
const queryDefs = toProcess.map(k => allQueryDefs[k]).filter(q => !!q);
654-
const queryInQueue = queryDefs.find(
649+
const queryInQueue = Object.values(allQueryDefs).find(
655650
q => this.redisHash(q.stageQueryKey) === this.redisHash(stageQueryKey) &&
656651
(priorityFilter != null ? q.priority === priorityFilter : true)
657652
);
658-
659653
if (queryInQueue) {
660654
if (active.indexOf(this.redisHash(queryInQueue.queryKey)) !== -1) {
661655
return {
662656
stage: 'Executing query',
663657
timeElapsed: queryInQueue.startQueryTime ? new Date().getTime() - queryInQueue.startQueryTime : undefined
664658
};
665659
}
666-
const index = queryDefs.filter(q => active.indexOf(this.redisHash(q.queryKey)) === -1).indexOf(queryInQueue);
660+
661+
const index = toProcess.indexOf(this.redisHash(queryInQueue.queryKey));
667662
if (index !== -1) {
668663
return index !== -1 ? { stage: `#${index + 1} in queue` } : undefined;
669664
}

0 commit comments

Comments
 (0)