Skip to content

Commit 766f03d

Browse files
authored
chore(tesseract): Disable multi-stage pre-aggregations if not supported by cubestore (#9771)
1 parent 7b15732 commit 766f03d

File tree

10 files changed

+51
-21
lines changed

10 files changed

+51
-21
lines changed

packages/cubejs-backend-shared/src/env.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1983,7 +1983,9 @@ const variables: Record<string, (...args: any) => any> = {
19831983
cubeStoreNoHeartBeatTimeout: () => get('CUBEJS_CUBESTORE_NO_HEART_BEAT_TIMEOUT')
19841984
.default('30')
19851985
.asInt(),
1986-
1986+
cubeStoreRollingWindowJoin: () => get('CUBEJS_CUBESTORE_ROLLING_WINDOW_JOIN')
1987+
.default('false')
1988+
.asBoolStrict(),
19871989
allowUngroupedWithoutPrimaryKey: () => get('CUBEJS_ALLOW_UNGROUPED_WITHOUT_PRIMARY_KEY')
19881990
.default(get('CUBESQL_SQL_PUSH_DOWN').default('true').asString())
19891991
.asBoolStrict(),

packages/cubejs-schema-compiler/src/adapter/BaseQuery.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ export class BaseQuery {
335335
if (this.useNativeSqlPlanner && !this.neverUseSqlPlannerPreaggregation()) {
336336
const fullAggregateMeasures = this.fullKeyQueryAggregateMeasures({ hasMultipliedForPreAggregation: true });
337337

338-
this.canUseNativeSqlPlannerPreAggregation = fullAggregateMeasures.multiStageMembers.length > 0 || fullAggregateMeasures.cumulativeMeasures.length > 0;
338+
this.canUseNativeSqlPlannerPreAggregation = fullAggregateMeasures.multiStageMembers.length > 0;
339339
}
340340
this.queryLevelJoinHints = this.options.joinHints ?? [];
341341
this.prebuildJoin();
@@ -880,6 +880,7 @@ export class BaseQuery {
880880
preAggregationQuery: this.options.preAggregationQuery,
881881
totalQuery: this.options.totalQuery,
882882
joinHints: this.options.joinHints,
883+
cubestoreSupportMultistage: this.options.cubestoreSupportMultistage ?? getEnv('cubeStoreRollingWindowJoin')
883884
};
884885

885886
const buildResult = nativeBuildSqlAndParams(queryParams);
@@ -929,7 +930,8 @@ export class BaseQuery {
929930
baseTools: this,
930931
ungrouped: this.options.ungrouped,
931932
exportAnnotatedSql: false,
932-
preAggregationQuery: this.options.preAggregationQuery
933+
preAggregationQuery: this.options.preAggregationQuery,
934+
cubestoreSupportMultistage: this.options.cubestoreSupportMultistage ?? getEnv('cubeStoreRollingWindowJoin')
933935
};
934936

935937
const buildResult = nativeBuildSqlAndParams(queryParams);

packages/cubejs-schema-compiler/src/adapter/CubeStoreQuery.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import moment from 'moment-timezone';
2-
import { parseSqlInterval } from '@cubejs-backend/shared';
2+
import { parseSqlInterval, getEnv } from '@cubejs-backend/shared';
33
import { BaseQuery } from './BaseQuery';
44
import { BaseFilter } from './BaseFilter';
55
import { BaseMeasure } from './BaseMeasure';
@@ -32,6 +32,13 @@ type RollingWindow = {
3232
};
3333

3434
export class CubeStoreQuery extends BaseQuery {
35+
private readonly cubeStoreRollingWindowJoin: boolean;
36+
37+
public constructor(compilers, options) {
38+
super(compilers, options);
39+
this.cubeStoreRollingWindowJoin = getEnv('cubeStoreRollingWindowJoin');
40+
}
41+
3542
public newFilter(filter) {
3643
return new CubeStoreFilter(this, filter);
3744
}
@@ -57,10 +64,16 @@ export class CubeStoreQuery extends BaseQuery {
5764
}
5865

5966
public subtractInterval(date: string, interval: string) {
67+
if (this.cubeStoreRollingWindowJoin) {
68+
return super.subtractInterval(date, interval);
69+
}
6070
return `DATE_SUB(${date}, INTERVAL ${this.formatInterval(interval)})`;
6171
}
6272

6373
public addInterval(date: string, interval: string) {
74+
if (this.cubeStoreRollingWindowJoin) {
75+
return super.addInterval(date, interval);
76+
}
6477
return `DATE_ADD(${date}, INTERVAL ${this.formatInterval(interval)})`;
6578
}
6679

@@ -185,7 +198,7 @@ export class CubeStoreQuery extends BaseQuery {
185198
cumulativeMeasures: Array<[boolean, BaseMeasure]>,
186199
preAggregationForQuery: any
187200
) {
188-
if (!cumulativeMeasures.length) {
201+
if (this.cubeStoreRollingWindowJoin || !cumulativeMeasures.length) {
189202
return super.regularAndTimeSeriesRollupQuery(regularMeasures, multipliedMeasures, cumulativeMeasures, preAggregationForQuery);
190203
}
191204
const cumulativeMeasuresWithoutMultiplied = cumulativeMeasures.map(([_, measure]) => measure);

packages/cubejs-schema-compiler/test/integration/postgres/pre-aggregations-multi-stage.test.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ describe('PreAggregationsMultiStage', () => {
6969
7070
testMeas: {
7171
type: 'countDistinct',
72-
sql: \`\${createdAtDay}\`
72+
sql: \`\${createdAtDay}\`
7373
}
7474
},
7575
@@ -188,7 +188,8 @@ describe('PreAggregationsMultiStage', () => {
188188
order: [{
189189
id: 'visitors.createdAt'
190190
}],
191-
preAggregationsSchema: ''
191+
preAggregationsSchema: '',
192+
cubestoreSupportMultistage: true
192193
});
193194

194195
const preAggregationsDescription: any = query.preAggregations?.preAggregationsDescription();
@@ -231,7 +232,8 @@ describe('PreAggregationsMultiStage', () => {
231232
order: [{
232233
id: 'visitors.source'
233234
}],
234-
preAggregationsSchema: ''
235+
preAggregationsSchema: '',
236+
cubestoreSupportMultistage: true
235237
});
236238

237239
const preAggregationsDescription: any = query.preAggregations?.preAggregationsDescription();
@@ -262,7 +264,8 @@ describe('PreAggregationsMultiStage', () => {
262264
order: [{
263265
id: 'visitors.source'
264266
}],
265-
preAggregationsSchema: ''
267+
preAggregationsSchema: '',
268+
cubestoreSupportMultistage: true
266269
});
267270

268271
const preAggregationsDescription: any = query.preAggregations?.preAggregationsDescription();

packages/cubejs-schema-compiler/test/integration/postgres/pre-aggregations.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1922,6 +1922,7 @@ describe('PreAggregations', () => {
19221922
}, {
19231923
id: 'visitors.source'
19241924
}],
1925+
cubestoreSupportMultistage: getEnv("nativeSqlPlanner")
19251926
});
19261927

19271928
const queryAndParams = query.buildSqlAndParams();
@@ -1999,6 +2000,7 @@ describe('PreAggregations', () => {
19992000
}, {
20002001
id: 'visitors.source'
20012002
}],
2003+
cubestoreSupportMultistage: getEnv("nativeSqlPlanner")
20022004
});
20032005

20042006
const queryAndParams = query.buildSqlAndParams();

packages/cubejs-testing-drivers/fixtures/athena.json

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -190,9 +190,6 @@
190190
"querying custom granularities ECommerce: count by three_months_by_march + dimension",
191191
"querying custom granularities (with preaggregation) ECommerce: totalQuantity by half_year + no dimension",
192192
"querying custom granularities (with preaggregation) ECommerce: totalQuantity by half_year + dimension",
193-
"pre-aggregations Customers: running total without time dimension",
194-
"querying BigECommerce: totalProfitYearAgo",
195-
"SQL API: post-aggregate percentage of total",
196193
"SQL API: Simple Rollup",
197194
"SQL API: Complex Rollup",
198195
"SQL API: Nested Rollup",
@@ -201,7 +198,6 @@
201198
"SQL API: Nested Rollup with aliases",
202199
"SQL API: Nested Rollup over asterisk",
203200
"SQL API: Extended nested Rollup over asterisk",
204-
"SQL API: Timeshift measure from cube",
205201
"SQL API: SQL push down push to cube quoted alias",
206202
"querying BigECommerce: rolling window YTD (month + week)",
207203
"querying BigECommerce: rolling window YTD (month + week + no gran)",

packages/cubejs-testing-drivers/fixtures/postgres.json

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -175,9 +175,7 @@
175175
"querying ECommerce: total quantity, avg discount, total sales, total profit by product + order + total -- noisy test",
176176
"querying custom granularities (with preaggregation) ECommerce: totalQuantity by half_year + no dimension",
177177
"querying custom granularities (with preaggregation) ECommerce: totalQuantity by half_year + dimension",
178-
"pre-aggregations Customers: running total without time dimension",
179-
"querying BigECommerce: totalProfitYearAgo",
180-
"SQL API: post-aggregate percentage of total",
178+
181179
"SQL API: Simple Rollup",
182180
"SQL API: Complex Rollup",
183181
"SQL API: Rollup with aliases",
@@ -186,7 +184,6 @@
186184
"SQL API: Nested Rollup with aliases",
187185
"SQL API: Nested Rollup over asterisk",
188186
"SQL API: Extended nested Rollup over asterisk",
189-
"SQL API: Timeshift measure from cube",
190187
"SQL API: SQL push down push to cube quoted alias",
191188

192189
"---- Different results comparing to baseQuery version. Need to investigate ----",

rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/base_query_options.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ pub struct BaseQueryOptionsStatic {
6666
pub pre_aggregation_query: Option<bool>,
6767
#[serde(rename = "totalQuery")]
6868
pub total_query: Option<bool>,
69+
#[serde(rename = "cubestoreSupportMultistage")]
70+
pub cubestore_support_multistage: Option<bool>,
6971
}
7072

7173
#[nativebridge::native_bridge(BaseQueryOptionsStatic)]

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,15 @@ impl MatchState {
2929

3030
pub struct PreAggregationOptimizer {
3131
query_tools: Rc<QueryTools>,
32+
allow_multi_stage: bool,
3233
used_pre_aggregations: HashMap<(String, String), Rc<PreAggregation>>,
3334
}
3435

3536
impl PreAggregationOptimizer {
36-
pub fn new(query_tools: Rc<QueryTools>) -> Self {
37+
pub fn new(query_tools: Rc<QueryTools>, allow_multi_stage: bool) -> Self {
3738
Self {
3839
query_tools,
40+
allow_multi_stage,
3941
used_pre_aggregations: HashMap::new(),
4042
}
4143
}
@@ -96,7 +98,10 @@ impl PreAggregationOptimizer {
9698
query: &FullKeyAggregateQuery,
9799
pre_aggregation: &Rc<CompiledPreAggregation>,
98100
) -> Result<Option<Rc<Query>>, CubeError> {
99-
if !query.multistage_members.is_empty() {
101+
if !self.allow_multi_stage && !query.multistage_members.is_empty() {
102+
return Ok(None);
103+
}
104+
if self.allow_multi_stage && !query.multistage_members.is_empty() {
100105
return self
101106
.try_rewrite_full_key_aggregate_query_with_multi_stages(query, pre_aggregation);
102107
}

rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,18 @@ pub struct BaseQuery<IT: InnerTypes> {
2020
context: NativeContextHolder<IT>,
2121
query_tools: Rc<QueryTools>,
2222
request: Rc<QueryProperties>,
23+
cubestore_support_multistage: bool,
2324
}
2425

2526
impl<IT: InnerTypes> BaseQuery<IT> {
2627
pub fn try_new(
2728
context: NativeContextHolder<IT>,
2829
options: Rc<dyn BaseQueryOptions>,
2930
) -> Result<Self, CubeError> {
31+
let cubestore_support_multistage = options
32+
.static_data()
33+
.cubestore_support_multistage
34+
.unwrap_or(false);
3035
let query_tools = QueryTools::try_new(
3136
options.cube_evaluator()?,
3237
options.base_tools()?,
@@ -41,6 +46,7 @@ impl<IT: InnerTypes> BaseQuery<IT> {
4146
context,
4247
query_tools,
4348
request,
49+
cubestore_support_multistage,
4450
})
4551
}
4652

@@ -142,8 +148,10 @@ impl<IT: InnerTypes> BaseQuery<IT> {
142148
plan: Rc<Query>,
143149
) -> Result<(Rc<Query>, Vec<Rc<PreAggregation>>), CubeError> {
144150
let result = if !self.request.is_pre_aggregation_query() {
145-
let mut pre_aggregation_optimizer =
146-
PreAggregationOptimizer::new(self.query_tools.clone());
151+
let mut pre_aggregation_optimizer = PreAggregationOptimizer::new(
152+
self.query_tools.clone(),
153+
self.cubestore_support_multistage,
154+
);
147155
if let Some(result) = pre_aggregation_optimizer.try_optimize(plan.clone())? {
148156
if pre_aggregation_optimizer.get_used_pre_aggregations().len() == 1 {
149157
(

0 commit comments

Comments
 (0)