Skip to content

Commit 6fd5ee4

Browse files
authored
feat: Introduce rollupLambda rollup type (cube-js#5315)
* feat: Introduce `rollupLambda` rollup type * chore: Introduce `rollupLambda` rollup type -- fix unit tests * chore: Introduce `rollupLambda` rollup type -- add TODO
1 parent 60075af commit 6fd5ee4

File tree

6 files changed

+147
-42
lines changed

6 files changed

+147
-42
lines changed

packages/cubejs-schema-compiler/src/adapter/BaseQuery.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -563,8 +563,9 @@ class BaseQuery {
563563
const preAggForQuery = this.preAggregations.findPreAggregationForQuery();
564564
const result = {};
565565
if (preAggForQuery && preAggForQuery.preAggregation.unionWithSourceData) {
566+
const lambdaPreAgg = preAggForQuery.referencedPreAggregations[0];
566567
// TODO(cristipp) Use source query instead of preaggregation references.
567-
const references = this.cubeEvaluator.evaluatePreAggregationReferences(preAggForQuery.cube, preAggForQuery.preAggregation);
568+
const references = this.cubeEvaluator.evaluatePreAggregationReferences(lambdaPreAgg.cube, lambdaPreAgg.preAggregation);
568569
const lambdaQuery = this.newSubQuery(
569570
{
570571
measures: references.measures,
@@ -593,7 +594,7 @@ class BaseQuery {
593594
() => this.cacheKeyQueries(),
594595
{ preAggregationQuery: true }
595596
);
596-
result[this.preAggregations.preAggregationId(preAggForQuery)] = { sqlAndParams, cacheKeyQueries };
597+
result[this.preAggregations.preAggregationId(lambdaPreAgg)] = { sqlAndParams, cacheKeyQueries };
597598
}
598599
return result;
599600
}

packages/cubejs-schema-compiler/src/adapter/PreAggregations.js

Lines changed: 100 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ export class PreAggregations {
6060
if (foundPreAggregation.preAggregation.type === 'rollupJoin') {
6161
preAggregations = foundPreAggregation.preAggregationsToJoin;
6262
}
63+
if (foundPreAggregation.preAggregation.type === 'rollupLambda') {
64+
preAggregations = foundPreAggregation.referencedPreAggregations;
65+
}
6366
return preAggregations.map(preAggregation => {
6467
if (this.canPartitionsBeUsed(preAggregation)) {
6568
const { dimension, partitionDimension } = this.partitionDimension(preAggregation);
@@ -679,33 +682,8 @@ export class PreAggregations {
679682
R.toPairs,
680683
// eslint-disable-next-line no-unused-vars
681684
// eslint-disable-next-line @typescript-eslint/no-unused-vars
682-
R.filter(([k, a]) => a.type === 'rollup' || a.type === 'rollupJoin'),
683-
R.map(([preAggregationName, preAggregation]) => {
684-
const preAggObj = this.evaluatedPreAggregationObj(
685-
cube, preAggregationName, preAggregation, canUsePreAggregation
686-
);
687-
if (preAggregation.type === 'rollupJoin') {
688-
// TODO evaluation optimizations. Should be cached or moved to compile time.
689-
const preAggregationsToJoin = preAggObj.references.rollups.map(
690-
name => {
691-
const [joinCube, joinPreAggregationName] = this.query.cubeEvaluator.parsePath('preAggregations', name);
692-
return this.evaluatedPreAggregationObj(
693-
joinCube,
694-
joinPreAggregationName,
695-
this.query.cubeEvaluator.byPath('preAggregations', name),
696-
canUsePreAggregation
697-
);
698-
}
699-
);
700-
return {
701-
...preAggObj,
702-
preAggregationsToJoin,
703-
rollupJoin: this.buildRollupJoin(preAggObj, preAggregationsToJoin)
704-
};
705-
} else {
706-
return preAggObj;
707-
}
708-
})
685+
R.filter(([k, a]) => a.type === 'rollup' || a.type === 'rollupJoin' || a.type === 'rollupLambda'),
686+
R.map(([preAggregationName, preAggregation]) => this.evaluatedPreAggregationObj(cube, preAggregationName, preAggregation, canUsePreAggregation))
709687
)(preAggregations);
710688
}
711689

@@ -789,13 +767,85 @@ export class PreAggregations {
789767

790768
evaluatedPreAggregationObj(cube, preAggregationName, preAggregation, canUsePreAggregation) {
791769
const references = this.evaluateAllReferences(cube, preAggregation);
792-
return {
770+
const preAggObj = {
793771
preAggregationName,
794772
preAggregation,
795773
cube,
796774
canUsePreAggregation: canUsePreAggregation(references),
797775
references
798776
};
777+
778+
if (preAggregation.type === 'rollupJoin') {
779+
// TODO evaluation optimizations. Should be cached or moved to compile time.
780+
const preAggregationsToJoin = preAggObj.references.rollups.map(
781+
name => {
782+
const [joinCube, joinPreAggregationName] = this.query.cubeEvaluator.parsePath('preAggregations', name);
783+
return this.evaluatedPreAggregationObj(
784+
joinCube,
785+
joinPreAggregationName,
786+
this.query.cubeEvaluator.byPath('preAggregations', name),
787+
canUsePreAggregation
788+
);
789+
}
790+
);
791+
return {
792+
...preAggObj,
793+
preAggregationsToJoin,
794+
rollupJoin: this.buildRollupJoin(preAggObj, preAggregationsToJoin)
795+
};
796+
} else if (preAggregation.type === 'rollupLambda') {
797+
// TODO evaluation optimizations. Should be cached or moved to compile time.
798+
const referencedPreAggregations = preAggObj.references.rollups.map(
799+
name => {
800+
const [referencedCube, referencedPreAggregation] = this.query.cubeEvaluator.parsePath('preAggregations', name);
801+
return this.evaluatedPreAggregationObj(
802+
referencedCube,
803+
referencedPreAggregation,
804+
this.query.cubeEvaluator.byPath('preAggregations', name),
805+
canUsePreAggregation
806+
);
807+
}
808+
);
809+
if (referencedPreAggregations.length === 0) {
810+
throw new UserError(`rollupLambda '${cube}.${preAggregationName}' should reference at least on rollup`);
811+
}
812+
if (referencedPreAggregations.length > 1) {
813+
throw new UserError(`rollupLambda '${cube}.${preAggregationName}' references multiple rollups. This feature is currently in early access preview. Please get in touch with us to get access to it: https://cube.dev/contact.`);
814+
}
815+
referencedPreAggregations[referencedPreAggregations.length - 1] = {
816+
...referencedPreAggregations[referencedPreAggregations.length - 1],
817+
preAggregation: {
818+
...referencedPreAggregations[referencedPreAggregations.length - 1].preAggregation,
819+
unionWithSourceData: preAggObj.preAggregation.unionWithSourceData,
820+
}
821+
};
822+
referencedPreAggregations.forEach(referencedPreAggregation => {
823+
PreAggregations.memberNameMismatchValidation(preAggObj, referencedPreAggregation, 'measures');
824+
PreAggregations.memberNameMismatchValidation(preAggObj, referencedPreAggregation, 'dimensions');
825+
PreAggregations.memberNameMismatchValidation(preAggObj, referencedPreAggregation, 'timeDimensions');
826+
});
827+
return {
828+
...preAggObj,
829+
referencedPreAggregations,
830+
};
831+
} else {
832+
return preAggObj;
833+
}
834+
}
835+
836+
static memberNameMismatchValidation(preAggA, preAggB, memberType) {
837+
const preAggAMemberNames = PreAggregations.memberShortNames(preAggA.references[memberType], memberType === 'timeDimensions');
838+
const preAggBMemberNames = PreAggregations.memberShortNames(preAggB.references[memberType], memberType === 'timeDimensions');
839+
if (!R.equals(
840+
preAggAMemberNames,
841+
preAggBMemberNames
842+
)) {
843+
throw new UserError(`Names for ${memberType} doesn't match between '${preAggA.cube}.${preAggA.preAggregationName}' and '${preAggB.cube}.${preAggB.preAggregationName}': ${JSON.stringify(preAggAMemberNames)} != ${JSON.stringify(preAggBMemberNames)}`);
844+
}
845+
}
846+
847+
static memberShortNames(memberArray, isTimeDimension) {
848+
return memberArray.map(member => (isTimeDimension ? member.dimension.split('.')[1] : member.split('.')[1]));
799849
}
800850

801851
rollupMatchResultDescriptions() {
@@ -906,7 +956,23 @@ export class PreAggregations {
906956
}
907957

908958
evaluateAllReferences(cube, aggregation) {
909-
return this.query.cubeEvaluator.evaluatePreAggregationReferences(cube, aggregation);
959+
const references = this.query.cubeEvaluator.evaluatePreAggregationReferences(cube, aggregation);
960+
if (aggregation.type === 'rollupLambda') {
961+
if (references.rollups.length > 0) {
962+
const [firstLambdaCube] = this.query.cubeEvaluator.parsePath('preAggregations', references.rollups[0]);
963+
const firstLambdaPreAggregation = this.query.cubeEvaluator.byPath('preAggregations', references.rollups[0]);
964+
const firstLambdaReferences = this.query.cubeEvaluator.evaluatePreAggregationReferences(firstLambdaCube, firstLambdaPreAggregation);
965+
966+
if (references.measures.length === 0 &&
967+
references.dimensions.length === 0 &&
968+
references.timeDimensions.length === 0) {
969+
return { ...firstLambdaReferences, rollups: references.rollups };
970+
} else {
971+
return references;
972+
}
973+
}
974+
}
975+
return references;
910976
}
911977

912978
originalSqlPreAggregationTable(preAggregationDescription) {
@@ -953,6 +1019,11 @@ export class PreAggregations {
9531019
})
9541020
)
9551021
];
1022+
} else if (preAggregationForQuery.preAggregation.type === 'rollupLambda') {
1023+
const lambdaPreAggregations = preAggregationForQuery.referencedPreAggregations;
1024+
1025+
// TODO support lambda union of rollups
1026+
toJoin = [sqlAndAlias(lambdaPreAggregations[0])];
9561027
} else {
9571028
toJoin = [sqlAndAlias(preAggregationForQuery)];
9581029
}

packages/cubejs-schema-compiler/src/compiler/CubeSymbols.js

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -138,16 +138,13 @@ export class CubeSymbols {
138138
preAggregation.type = 'rollup';
139139
}
140140

141-
if (preAggregation.scheduledRefresh === undefined) {
142-
if (preAggregation.type === 'rollupJoin') {
143-
preAggregation.scheduledRefresh = false;
144-
} else {
145-
preAggregation.scheduledRefresh = getEnv('scheduledRefreshDefault');
146-
}
141+
if (preAggregation.scheduledRefresh === undefined && preAggregation.type !== 'rollupJoin' && preAggregation.type !== 'rollupLambda') {
142+
preAggregation.scheduledRefresh = getEnv('scheduledRefreshDefault');
147143
}
148144

149-
if (preAggregation.external === undefined) {
145+
if (preAggregation.external === undefined && preAggregation.type !== 'rollupLambda') {
150146
preAggregation.external =
147+
// TODO remove rollupJoin from this list and update validation
151148
['rollup', 'rollupJoin'].includes(preAggregation.type) &&
152149
getEnv('externalDefault');
153150
}

packages/cubejs-schema-compiler/src/compiler/CubeValidator.js

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,6 @@ const BasePreAggregationWithoutPartitionGranularity = {
180180
sql: Joi.func().required()
181181
},
182182
readOnly: Joi.boolean().strict(),
183-
unionWithSourceData: Joi.boolean().strict(),
184183
};
185184

186185
const BasePreAggregation = {
@@ -289,6 +288,28 @@ const RollUpJoinSchema = condition(
289288
)
290289
);
291290

291+
const RollupLambdaSchema = condition(
292+
(s) => defined(s.granularity) || defined(s.timeDimension),
293+
{
294+
type: Joi.any().valid('rollupLambda').required(),
295+
granularity: GranularitySchema,
296+
timeDimension: Joi.func().required(),
297+
rollups: Joi.func().required(),
298+
measures: Joi.func(),
299+
dimensions: Joi.func(),
300+
segments: Joi.func(),
301+
unionWithSourceData: Joi.boolean().strict(),
302+
},
303+
{
304+
type: Joi.any().valid('rollupLambda').required(),
305+
rollups: Joi.func().required(),
306+
measures: Joi.func(),
307+
dimensions: Joi.func(),
308+
segments: Joi.func(),
309+
unionWithSourceData: Joi.boolean().strict(),
310+
},
311+
);
312+
292313
const RollUpSchema = condition(
293314
(s) => defined(s.granularity) || defined(s.timeDimension) || defined(s.timeDimensionReference),
294315
condition(
@@ -338,6 +359,7 @@ const PreAggregationsAlternatives = Joi.object().pattern(
338359
{ is: 'autoRollup', then: AutoRollupSchema },
339360
{ is: 'originalSql', then: OriginalSqlSchema },
340361
{ is: 'rollupJoin', then: RollUpJoinSchema },
362+
{ is: 'rollupLambda', then: RollupLambdaSchema },
341363
{ is: 'rollup',
342364
then: RollUpSchema,
343365
otherwise: Joi.object().keys({

packages/cubejs-schema-compiler/test/unit/pre-aggregations.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,6 @@ describe('pre-aggregations', () => {
8989

9090
expect(cubeEvaluator.cubeFromPath('Users').preAggregations.usersRollup.scheduledRefresh).toEqual(true);
9191
expect(cubeEvaluator.cubeFromPath('Orders').preAggregations.ordersRollup.scheduledRefresh).toEqual(true);
92-
expect(cubeEvaluator.cubeFromPath('Orders').preAggregations.ordersRollupJoin.scheduledRefresh).toEqual(false);
92+
expect(cubeEvaluator.cubeFromPath('Orders').preAggregations.ordersRollupJoin.scheduledRefresh).toEqual(undefined);
9393
});
9494
});

packages/cubejs-testing/birdbox-fixtures/lambda/schema/Orders.js

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,23 @@ cube(`Orders`, {
22
sql: `SELECT * FROM public.orders`,
33

44
preAggregations: {
5-
ordersByCompletedAt: {
5+
ordersByCompletedAtLambda: {
6+
type: `rollupLambda`,
7+
rollups: [ordersByCompletedAt],
8+
unionWithSourceData: true,
9+
},
10+
11+
ordersByCompletedAtAndUserIdLambda: {
12+
type: `rollupLambda`,
13+
measures: [count],
14+
dimensions: [status, userId],
15+
timeDimension: completedAt,
16+
granularity: `day`,
17+
rollups: [ordersByCompletedAtAndUserId],
618
unionWithSourceData: true,
19+
},
20+
21+
ordersByCompletedAt: {
722
measures: [count],
823
dimensions: [status],
924
timeDimension: completedAt,
@@ -21,7 +36,6 @@ cube(`Orders`, {
2136
},
2237

2338
ordersByCompletedAtAndUserId: {
24-
unionWithSourceData: true,
2539
measures: [count],
2640
dimensions: [status, userId],
2741
timeDimension: completedAt,

0 commit comments

Comments
 (0)