diff --git a/packages/cubejs-server-core/src/core/OrchestratorStorage.ts b/packages/cubejs-server-core/src/core/OrchestratorStorage.ts index cc1f6de078b00..d8eb87d2c904b 100644 --- a/packages/cubejs-server-core/src/core/OrchestratorStorage.ts +++ b/packages/cubejs-server-core/src/core/OrchestratorStorage.ts @@ -12,6 +12,8 @@ export class OrchestratorStorage { }); } + protected readonly initializers: Map> = new Map(); + public has(orchestratorId: string) { return this.storage.has(orchestratorId); } @@ -24,6 +26,29 @@ export class OrchestratorStorage { return this.storage.set(orchestratorId, orchestratorApi); } + public async getOrInit(orchestratorId: string, init: () => Promise): Promise { + if (this.storage.has(orchestratorId)) { + return this.storage.get(orchestratorId); + } + + if (this.initializers.has(orchestratorId)) { + return this.initializers.get(orchestratorId); + } + + try { + const initPromise = init(); + this.initializers.set(orchestratorId, initPromise); + + const instance = await initPromise; + + this.storage.set(orchestratorId, instance); + + return instance; + } finally { + this.initializers.delete(orchestratorId); + } + } + public clear() { this.storage.clear(); } diff --git a/packages/cubejs-server-core/src/core/server.ts b/packages/cubejs-server-core/src/core/server.ts index 1db5966d5af31..52ec4ff4aae8b 100644 --- a/packages/cubejs-server-core/src/core/server.ts +++ b/packages/cubejs-server-core/src/core/server.ts @@ -569,88 +569,49 @@ export class CubejsServerCore { public async getOrchestratorApi(context: RequestContext): Promise { const orchestratorId = await this.contextToOrchestratorId(context); - if (this.orchestratorStorage.has(orchestratorId)) { - return this.orchestratorStorage.get(orchestratorId); - } - - /** - * Hash table to store promises which will be resolved with the - * datasource drivers. DriverFactoryByDataSource function is closure - * this constant. - */ - const driverPromise: Record> = {}; - - let externalPreAggregationsDriverPromise: Promise | null = null; - - const contextToDbType: DbTypeAsyncFn = this.contextToDbType.bind(this); - const externalDbType = this.contextToExternalDbType(context); - - // orchestrator options can be empty, if user didn't define it. - // so we are adding default and configuring queues concurrency. - const orchestratorOptions = - this.optsHandler.getOrchestratorInitializedOptions( - context, - (await this.orchestratorOptions(context)) || {}, - ); - - const orchestratorApi = this.createOrchestratorApi( + return this.orchestratorStorage.getOrInit(orchestratorId, async () => { /** - * Driver factory function `DriverFactoryByDataSource`. + * Hash table to store promises which will be resolved with the + * datasource drivers. DriverFactoryByDataSource function is a closure + * this constant. */ - async (dataSource = 'default') => { - if (driverPromise[dataSource]) { - return driverPromise[dataSource]; - } + const driverPromise: Record> = {}; - // eslint-disable-next-line no-return-assign - return driverPromise[dataSource] = (async () => { - let driver: BaseDriver | null = null; - - try { - driver = await this.resolveDriver( - { - ...context, - dataSource, - }, - orchestratorOptions, - ); - - if (typeof driver === 'object' && driver != null) { - if (driver.setLogger) { - driver.setLogger(this.logger); - } + let externalPreAggregationsDriverPromise: Promise | null = null; - await driver.testConnection(); + const contextToDbType: DbTypeAsyncFn = this.contextToDbType.bind(this); + const externalDbType = this.contextToExternalDbType(context); - return driver; - } - - throw new Error( - `Unexpected return type, driverFactory must return driver (dataSource: "${dataSource}"), actual: ${getRealType(driver)}` - ); - } catch (e) { - driverPromise[dataSource] = null; - - if (driver) { - await driver.release(); - } + // orchestrator options can be empty, if user didn't define it. + // so we are adding default and configuring queues concurrency. + const orchestratorOptions = + this.optsHandler.getOrchestratorInitializedOptions( + context, + (await this.orchestratorOptions(context)) || {}, + ); - throw e; - } - })(); - }, - { - externalDriverFactory: this.options.externalDriverFactory && (async () => { - if (externalPreAggregationsDriverPromise) { - return externalPreAggregationsDriverPromise; + return this.createOrchestratorApi( + /** + * Driver factory function `DriverFactoryByDataSource`. + */ + async (dataSource = 'default') => { + if (driverPromise[dataSource]) { + return driverPromise[dataSource]; } // eslint-disable-next-line no-return-assign - return externalPreAggregationsDriverPromise = (async () => { + return driverPromise[dataSource] = (async () => { let driver: BaseDriver | null = null; try { - driver = await this.options.externalDriverFactory(context); + driver = await this.resolveDriver( + { + ...context, + dataSource, + }, + orchestratorOptions, + ); + if (typeof driver === 'object' && driver != null) { if (driver.setLogger) { driver.setLogger(this.logger); @@ -662,10 +623,10 @@ export class CubejsServerCore { } throw new Error( - `Unexpected return type, externalDriverFactory must return driver, actual: ${getRealType(driver)}` + `Unexpected return type, driverFactory must return driver (dataSource: "${dataSource}"), actual: ${getRealType(driver)}` ); } catch (e) { - externalPreAggregationsDriverPromise = null; + driverPromise[dataSource] = null; if (driver) { await driver.release(); @@ -674,23 +635,56 @@ export class CubejsServerCore { throw e; } })(); - }), - contextToDbType: async (dataSource) => contextToDbType({ - ...context, - dataSource - }), - // speedup with cache - contextToExternalDbType: () => externalDbType, - redisPrefix: orchestratorId, - skipExternalCacheAndQueue: externalDbType === 'cubestore', - cacheAndQueueDriver: this.options.cacheAndQueueDriver, - ...orchestratorOptions, - } - ); + }, + { + externalDriverFactory: this.options.externalDriverFactory && (async () => { + if (externalPreAggregationsDriverPromise) { + return externalPreAggregationsDriverPromise; + } + + // eslint-disable-next-line no-return-assign + return externalPreAggregationsDriverPromise = (async () => { + let driver: BaseDriver | null = null; + + try { + driver = await this.options.externalDriverFactory(context); + if (typeof driver === 'object' && driver != null) { + if (driver.setLogger) { + driver.setLogger(this.logger); + } - this.orchestratorStorage.set(orchestratorId, orchestratorApi); + await driver.testConnection(); + + return driver; + } - return orchestratorApi; + throw new Error( + `Unexpected return type, externalDriverFactory must return driver, actual: ${getRealType(driver)}` + ); + } catch (e) { + externalPreAggregationsDriverPromise = null; + + if (driver) { + await driver.release(); + } + + throw e; + } + })(); + }), + contextToDbType: async (dataSource) => contextToDbType({ + ...context, + dataSource + }), + // speedup with cache + contextToExternalDbType: () => externalDbType, + redisPrefix: orchestratorId, + skipExternalCacheAndQueue: externalDbType === 'cubestore', + cacheAndQueueDriver: this.options.cacheAndQueueDriver, + ...orchestratorOptions, + } + ); + }); } protected createCompilerApi(repository, options: Record = {}) {