diff --git a/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts b/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts index bdbb1f33872fe..7187683388da1 100644 --- a/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts +++ b/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts @@ -174,8 +174,6 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface { toProcess.push(row.id as string); } else if (row.status === 'active') { active.push(row.id as string); - // TODO: getQueryStage is broken for Executing query stage... - toProcess.push(row.id as string); } } diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueDriver.js b/packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueDriver.js index 0aa8bebc4f110..4ebb67a5747e6 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueDriver.js +++ b/packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueDriver.js @@ -152,12 +152,14 @@ export class LocalQueueDriverConnection { } let added = 0; - if (!this.toProcess[key]) { + + if (!this.toProcess[key] && !this.active[key]) { this.toProcess[key] = { order: keyScore, queueId: options.queueId, key }; + added = 1; } @@ -291,10 +293,14 @@ export class LocalQueueDriverConnection { } let added = 0; + if (Object.keys(this.active).length < this.concurrency && !this.active[key]) { - this.active[key] = { key, order: processingId }; + this.active[key] = { key, order: processingId, queueId: processingId }; + delete this.toProcess[key]; + added = 1; } + this.heartBeat[key] = { key, order: new Date().getTime() }; if (this.getQueueEventsBus) { diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js index fee5aa7250fc6..06fbe8f7d49ae 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js @@ -1,7 +1,14 @@ import R from 'ramda'; import { EventEmitter } from 'events'; import { getEnv, getProcessUid } from '@cubejs-backend/shared'; -import { QueueDriverInterface, QueryKey, QueryKeyHash, QueueId, QueryDef } from '@cubejs-backend/base-driver'; +import { + QueueDriverInterface, + QueryKey, + QueryKeyHash, + QueueId, + QueryDef, + QueryStageStateResponse +} from '@cubejs-backend/base-driver'; import { CubeStoreQueueDriver } from '@cubejs-backend/cubestore-driver'; import { TimeoutError } from './TimeoutError'; @@ -557,31 +564,20 @@ export class QueryQueue { } })); - /** - * There is a bug somewhere in Redis (maybe in memory too?), - * which doesn't remove queue item from pending, while it's in active state - * - * TODO(ovr): Check LocalQueueDriver for strict guarantees that item cannot be in active & pending in the same time - * TODO(ovr): Migrate to getToProcessQueries after removal of Redis - */ - const [active, toProcess] = await queueConnection.getActiveAndToProcess(); + const [_active, toProcess] = await queueConnection.getActiveAndToProcess(); await Promise.all( R.pipe( R.filter(([queryKey, _queueId]) => { - if (active.findIndex(([p, _a]) => p === queryKey) === -1) { - const subKeys = queryKey.split('@'); - if (subKeys.length === 1) { - // common queries - return true; - } else if (subKeys[1] === this.processUid) { - // current process persistent queries - return true; - } else { - // other processes persistent queries - return false; - } + const subKeys = queryKey.split('@'); + if (subKeys.length === 1) { + // common queries + return true; + } else if (subKeys[1] === this.processUid) { + // current process persistent queries + return true; } else { + // other processes persistent queries return false; } }), @@ -627,7 +623,7 @@ export class QueryQueue { * Returns the list of queries planned to be processed and the list of active * queries. * - * @returns {Array} + * @returns {Promise} */ async fetchQueryStageState() { const queueConnection = await this.queueDriver.createConnection(); @@ -644,18 +640,16 @@ export class QueryQueue { * * @param {*} stageQueryKey * @param {number=} priorityFilter - * @param {Array=} queryStageState + * @param {QueryStageStateResponse=} queryStageState * @returns {Promise | Promise<{ stage: string, timeElapsed: number }>} */ async getQueryStage(stageQueryKey, priorityFilter, queryStageState) { const [active, toProcess, allQueryDefs] = queryStageState || await this.fetchQueryStageState(); - const queryDefs = toProcess.map(k => allQueryDefs[k]).filter(q => !!q); - const queryInQueue = queryDefs.find( + const queryInQueue = Object.values(allQueryDefs).find( q => this.redisHash(q.stageQueryKey) === this.redisHash(stageQueryKey) && (priorityFilter != null ? q.priority === priorityFilter : true) ); - if (queryInQueue) { if (active.indexOf(this.redisHash(queryInQueue.queryKey)) !== -1) { return { @@ -663,7 +657,10 @@ export class QueryQueue { timeElapsed: queryInQueue.startQueryTime ? new Date().getTime() - queryInQueue.startQueryTime : undefined }; } - const index = queryDefs.filter(q => active.indexOf(this.redisHash(q.queryKey)) === -1).indexOf(queryInQueue); + + const index = toProcess + .filter((queryKey) => (priorityFilter != null ? allQueryDefs[queryKey]?.priority === priorityFilter : true)) + .indexOf(this.redisHash(queryInQueue.queryKey)); if (index !== -1) { return index !== -1 ? { stage: `#${index + 1} in queue` } : undefined; }