Skip to content

Commit 05bb520

Browse files
authored
feat(tesseract): Rollup Join support (#9745)
1 parent 8a43bc4 commit 05bb520

File tree

16 files changed

+694
-206
lines changed

16 files changed

+694
-206
lines changed

packages/cubejs-schema-compiler/src/compiler/CubeEvaluator.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,22 @@ export class CubeEvaluator extends CubeSymbols {
571571
}));
572572
}
573573

574+
public preAggregationDescriptionByName(cubeName: string, preAggName: string) {
575+
const cube = this.cubeFromPath(cubeName);
576+
const preAggregations = cube.preAggregations || {};
577+
578+
const preAgg = preAggregations[preAggName];
579+
580+
if (!preAgg) {
581+
return undefined;
582+
}
583+
584+
return {
585+
name: preAggName,
586+
...(preAgg as Record<string, any>)
587+
};
588+
}
589+
574590
/**
575591
* Returns pre-aggregations filtered by the specified selector.
576592
*/
@@ -785,6 +801,14 @@ export class CubeEvaluator extends CubeSymbols {
785801
return { cubeReferencesUsed, pathReferencesUsed, evaluatedSql };
786802
}
787803

804+
/**
805+
* Evaluates rollup references for retrieving rollupReference used in Tesseract.
806+
* This is a temporary solution until Tesseract takes ownership of all pre-aggregations.
807+
*/
808+
public evaluateRollupReferences<T extends ToString | Array<ToString>>(cube: string, rollupReferences: (...args: Array<unknown>) => T) {
809+
return this.evaluateReferences(cube, rollupReferences, { originalSorting: true });
810+
}
811+
788812
public evaluatePreAggregationReferences(cube: string, aggregation: PreAggregationDefinition): PreAggregationReferences {
789813
const timeDimensions: Array<PreAggregationTimeDimensionReference> = [];
790814

rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/evaluator.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,4 +64,16 @@ pub trait CubeEvaluator {
6464
&self,
6565
cube_name: String,
6666
) -> Result<Vec<Rc<dyn PreAggregationDescription>>, CubeError>;
67+
#[nbridge(optional)]
68+
fn pre_aggregation_description_by_name(
69+
&self,
70+
cube_name: String,
71+
name: String,
72+
) -> Result<Option<Rc<dyn PreAggregationDescription>>, CubeError>;
73+
#[nbridge(vec)]
74+
fn evaluate_rollup_references(
75+
&self,
76+
cube: String,
77+
sql: Rc<dyn MemberSql>,
78+
) -> Result<Vec<String>, CubeError>;
6779
}

rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/pre_aggregation_description.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,7 @@ pub trait PreAggregationDescription {
3232

3333
#[nbridge(field, optional)]
3434
fn time_dimension_reference(&self) -> Result<Option<Rc<dyn MemberSql>>, CubeError>;
35+
36+
#[nbridge(field, optional)]
37+
fn rollup_references(&self) -> Result<Option<Rc<dyn MemberSql>>, CubeError>;
3538
}
Lines changed: 31 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,40 @@
1-
use crate::cube_bridge::member_sql::MemberSql;
2-
use crate::cube_bridge::pre_aggregation_description::PreAggregationDescription;
3-
use crate::planner::query_tools::QueryTools;
4-
use crate::planner::sql_evaluator::MemberSymbol;
5-
use cubenativeutils::CubeError;
1+
use crate::planner::sql_evaluator::{MemberSymbol, SqlCall};
62
use std::fmt::Debug;
73
use std::rc::Rc;
4+
5+
#[derive(Clone)]
6+
pub struct PreAggregationJoinItem {
7+
pub from: PreAggregationTable,
8+
pub to: PreAggregationTable,
9+
pub from_members: Vec<Rc<MemberSymbol>>,
10+
pub to_members: Vec<Rc<MemberSymbol>>,
11+
pub on_sql: Rc<SqlCall>,
12+
}
13+
14+
#[derive(Clone)]
15+
pub struct PreAggregationJoin {
16+
pub root: PreAggregationTable,
17+
pub items: Vec<PreAggregationJoinItem>,
18+
}
19+
20+
#[derive(Clone)]
21+
pub struct PreAggregationTable {
22+
pub cube_name: String,
23+
pub name: String,
24+
pub alias: Option<String>,
25+
}
26+
27+
#[derive(Clone)]
28+
pub enum PreAggregationSource {
29+
Table(PreAggregationTable),
30+
Join(PreAggregationJoin),
31+
}
32+
833
#[derive(Clone)]
934
pub struct CompiledPreAggregation {
1035
pub cube_name: String,
1136
pub name: String,
37+
pub source: Rc<PreAggregationSource>,
1238
pub granularity: Option<String>,
1339
pub external: Option<bool>,
1440
pub measures: Vec<Rc<MemberSymbol>>,
@@ -34,104 +60,3 @@ impl Debug for CompiledPreAggregation {
3460
.finish()
3561
}
3662
}
37-
38-
impl CompiledPreAggregation {
39-
pub fn try_new(
40-
query_tools: Rc<QueryTools>,
41-
cube_name: &String,
42-
description: Rc<dyn PreAggregationDescription>,
43-
) -> Result<Rc<Self>, CubeError> {
44-
let static_data = description.static_data();
45-
let measures = if let Some(refs) = description.measure_references()? {
46-
Self::symbols_from_ref(query_tools.clone(), cube_name, refs, Self::check_is_measure)?
47-
} else {
48-
Vec::new()
49-
};
50-
let dimensions = if let Some(refs) = description.dimension_references()? {
51-
Self::symbols_from_ref(
52-
query_tools.clone(),
53-
cube_name,
54-
refs,
55-
Self::check_is_dimension,
56-
)?
57-
} else {
58-
Vec::new()
59-
};
60-
let time_dimensions = if let Some(refs) = description.time_dimension_reference()? {
61-
let dims = Self::symbols_from_ref(
62-
query_tools.clone(),
63-
cube_name,
64-
refs,
65-
Self::check_is_time_dimension,
66-
)?;
67-
/* if dims.len() != 1 {
68-
return Err(CubeError::user(format!(
69-
"Pre aggregation should contains only one time dimension"
70-
)));
71-
} */
72-
vec![(dims[0].clone(), static_data.granularity.clone())] //TODO remove unwrap
73-
} else {
74-
Vec::new()
75-
};
76-
let allow_non_strict_date_range_match = description
77-
.static_data()
78-
.allow_non_strict_date_range_match
79-
.unwrap_or(false);
80-
let res = Rc::new(Self {
81-
name: static_data.name.clone(),
82-
cube_name: cube_name.clone(),
83-
granularity: static_data.granularity.clone(),
84-
external: static_data.external,
85-
measures,
86-
dimensions,
87-
time_dimensions,
88-
allow_non_strict_date_range_match,
89-
});
90-
Ok(res)
91-
}
92-
93-
fn symbols_from_ref<F: Fn(&MemberSymbol) -> Result<(), CubeError>>(
94-
query_tools: Rc<QueryTools>,
95-
cube_name: &String,
96-
ref_func: Rc<dyn MemberSql>,
97-
check_type_fn: F,
98-
) -> Result<Vec<Rc<MemberSymbol>>, CubeError> {
99-
let evaluator_compiler_cell = query_tools.evaluator_compiler().clone();
100-
let mut evaluator_compiler = evaluator_compiler_cell.borrow_mut();
101-
let sql_call = evaluator_compiler.compile_sql_call(cube_name, ref_func)?;
102-
let mut res = Vec::new();
103-
for symbol in sql_call.get_dependencies().iter() {
104-
check_type_fn(&symbol)?;
105-
res.push(symbol.clone());
106-
}
107-
Ok(res)
108-
}
109-
110-
fn check_is_measure(symbol: &MemberSymbol) -> Result<(), CubeError> {
111-
symbol
112-
.as_measure()
113-
.map_err(|_| CubeError::user(format!("Pre-aggregation measure must be a measure")))?;
114-
Ok(())
115-
}
116-
117-
fn check_is_dimension(symbol: &MemberSymbol) -> Result<(), CubeError> {
118-
symbol.as_dimension().map_err(|_| {
119-
CubeError::user(format!("Pre-aggregation dimension must be a dimension"))
120-
})?;
121-
Ok(())
122-
}
123-
124-
fn check_is_time_dimension(symbol: &MemberSymbol) -> Result<(), CubeError> {
125-
let dimension = symbol.as_dimension().map_err(|_| {
126-
CubeError::user(format!(
127-
"Pre-aggregation time dimension must be a dimension"
128-
))
129-
})?;
130-
if dimension.dimension_type() != "time" {
131-
return Err(CubeError::user(format!(
132-
"Pre-aggregation time dimension must be a dimension"
133-
)));
134-
}
135-
Ok(())
136-
}
137-
}

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ mod measure_matcher;
44
mod optimizer;
55
mod original_sql_collector;
66
mod original_sql_optimizer;
7+
mod pre_aggregations_compiler;
78

89
pub use compiled_pre_aggregation::*;
910
use dimension_matcher::*;
1011
use measure_matcher::*;
1112
pub use optimizer::*;
1213
pub use original_sql_collector::*;
1314
pub use original_sql_optimizer::*;
15+
pub use pre_aggregations_compiler::*;

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs

Lines changed: 39 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use super::PreAggregationsCompiler;
12
use super::*;
23
use crate::logical_plan::*;
34
use crate::plan::FilterItem;
@@ -43,22 +44,9 @@ impl PreAggregationOptimizer {
4344
let mut cube_names_collector = CubeNamesCollector::new();
4445
cube_names_collector.collect(&plan)?;
4546
let cube_names = cube_names_collector.result();
47+
let mut compiler = PreAggregationsCompiler::try_new(self.query_tools.clone(), &cube_names)?;
4648

47-
let mut compiled_pre_aggregations = Vec::new();
48-
for cube_name in cube_names.iter() {
49-
let pre_aggregations = self
50-
.query_tools
51-
.cube_evaluator()
52-
.pre_aggregations_for_cube_as_array(cube_name.clone())?;
53-
for pre_aggregation in pre_aggregations.iter() {
54-
let compiled = CompiledPreAggregation::try_new(
55-
self.query_tools.clone(),
56-
cube_name,
57-
pre_aggregation.clone(),
58-
)?;
59-
compiled_pre_aggregations.push(compiled);
60-
}
61-
}
49+
let compiled_pre_aggregations = compiler.compile_all_pre_aggregations()?;
6250

6351
for pre_aggregation in compiled_pre_aggregations.iter() {
6452
let new_query = self.try_rewrite_query(plan.clone(), pre_aggregation)?;
@@ -413,51 +401,44 @@ impl PreAggregationOptimizer {
413401
&mut self,
414402
pre_aggregation: &Rc<CompiledPreAggregation>,
415403
) -> Result<Rc<PreAggregation>, CubeError> {
416-
let pre_aggregation_obj = self.query_tools.base_tools().get_pre_aggregation_by_name(
404+
/* let pre_aggregation_obj = self.query_tools.base_tools().get_pre_aggregation_by_name(
417405
pre_aggregation.cube_name.clone(),
418406
pre_aggregation.name.clone(),
419-
)?;
420-
if let Some(table_name) = &pre_aggregation_obj.static_data().table_name {
421-
let schema = LogicalSchema {
422-
time_dimensions: vec![],
423-
dimensions: pre_aggregation
424-
.dimensions
425-
.iter()
426-
.cloned()
427-
.chain(
428-
pre_aggregation
429-
.time_dimensions
430-
.iter()
431-
.map(|(d, _)| d.clone()),
432-
)
433-
.collect(),
434-
measures: pre_aggregation.measures.to_vec(),
435-
multiplied_measures: HashSet::new(),
436-
};
437-
let pre_aggregation = PreAggregation {
438-
name: pre_aggregation.name.clone(),
439-
time_dimensions: pre_aggregation.time_dimensions.clone(),
440-
dimensions: pre_aggregation.dimensions.clone(),
441-
measures: pre_aggregation.measures.clone(),
442-
schema: Rc::new(schema),
443-
external: pre_aggregation.external.unwrap_or_default(),
444-
granularity: pre_aggregation.granularity.clone(),
445-
table_name: table_name.clone(),
446-
cube_name: pre_aggregation.cube_name.clone(),
447-
pre_aggregation_obj,
448-
};
449-
let result = Rc::new(pre_aggregation);
450-
self.used_pre_aggregations.insert(
451-
(result.cube_name.clone(), result.name.clone()),
452-
result.clone(),
453-
);
454-
Ok(result)
455-
} else {
456-
Err(CubeError::internal(format!(
457-
"Cannot find pre aggregation object for cube {} and name {}",
458-
pre_aggregation.cube_name, pre_aggregation.name
459-
)))
460-
}
407+
)?; */
408+
//if let Some(table_name) = &pre_aggregation_obj.static_data().table_name {
409+
let schema = LogicalSchema {
410+
time_dimensions: vec![],
411+
dimensions: pre_aggregation
412+
.dimensions
413+
.iter()
414+
.cloned()
415+
.chain(
416+
pre_aggregation
417+
.time_dimensions
418+
.iter()
419+
.map(|(d, _)| d.clone()),
420+
)
421+
.collect(),
422+
measures: pre_aggregation.measures.to_vec(),
423+
multiplied_measures: HashSet::new(),
424+
};
425+
let pre_aggregation = PreAggregation {
426+
name: pre_aggregation.name.clone(),
427+
time_dimensions: pre_aggregation.time_dimensions.clone(),
428+
dimensions: pre_aggregation.dimensions.clone(),
429+
measures: pre_aggregation.measures.clone(),
430+
schema: Rc::new(schema),
431+
external: pre_aggregation.external.unwrap_or_default(),
432+
granularity: pre_aggregation.granularity.clone(),
433+
source: pre_aggregation.source.clone(),
434+
cube_name: pre_aggregation.cube_name.clone(),
435+
};
436+
let result = Rc::new(pre_aggregation);
437+
self.used_pre_aggregations.insert(
438+
(result.cube_name.clone(), result.name.clone()),
439+
result.clone(),
440+
);
441+
Ok(result)
461442
}
462443

463444
fn is_schema_and_filters_match(

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/original_sql_optimizer.rs

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use super::PreAggregationsCompiler;
12
use super::*;
23
use crate::logical_plan::*;
34
use crate::planner::query_tools::QueryTools;
@@ -352,25 +353,11 @@ impl OriginalSqlOptimizer {
352353
let res = if let Some(found_pre_aggregation) = self.foud_pre_aggregations.get(cube_name) {
353354
Some(found_pre_aggregation.clone())
354355
} else {
355-
let pre_aggregations = self
356-
.query_tools
357-
.cube_evaluator()
358-
.pre_aggregations_for_cube_as_array(cube_name.clone())?;
359-
if let Some(found_pre_aggregation) = pre_aggregations
360-
.iter()
361-
.find(|p| p.static_data().pre_aggregation_type == "originalSql")
362-
{
363-
let compiled = CompiledPreAggregation::try_new(
364-
self.query_tools.clone(),
365-
cube_name,
366-
found_pre_aggregation.clone(),
367-
)?;
368-
self.foud_pre_aggregations
369-
.insert(cube_name.clone(), compiled.clone());
370-
Some(compiled)
371-
} else {
372-
None
373-
}
356+
let mut compiler = PreAggregationsCompiler::try_new(
357+
self.query_tools.clone(),
358+
&vec![cube_name.clone()],
359+
)?;
360+
compiler.compile_origin_sql_pre_aggregation(&cube_name)?
374361
};
375362
Ok(res)
376363
}

0 commit comments

Comments
 (0)