Skip to content

Commit 14c6409

Browse files
authored
Merge pull request #7651 from leiysky/refactor-executor
chore(planner): Minor refactor of `PhysicalScalar`
2 parents 3c2993e + 80785f6 commit 14c6409

File tree

16 files changed

+109
-175
lines changed

16 files changed

+109
-175
lines changed

src/query/service/src/api/rpc/flight_scatter_hash_v2.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,12 @@ use crate::api::rpc::flight_scatter::FlightScatter;
2727
use crate::evaluator::EvalNode;
2828
use crate::evaluator::Evaluator;
2929
use crate::evaluator::TypedVector;
30-
use crate::sql::executor::ColumnID;
3130
use crate::sql::executor::PhysicalScalar;
3231

3332
#[derive(Clone)]
3433
pub struct HashFlightScatterV2 {
3534
func_ctx: FunctionContext,
36-
hash_keys: Vec<EvalNode<ColumnID>>,
35+
hash_keys: Vec<EvalNode>,
3736
hash_functions: Vec<Box<dyn Function>>,
3837
scatter_size: usize,
3938
}
@@ -118,7 +117,7 @@ impl HashFlightScatterV2 {
118117
struct OneHashKeyFlightScatter {
119118
scatter_size: usize,
120119
func_ctx: FunctionContext,
121-
indices_scalar: EvalNode<ColumnID>,
120+
indices_scalar: EvalNode,
122121
}
123122

124123
impl OneHashKeyFlightScatter {
@@ -129,7 +128,7 @@ impl OneHashKeyFlightScatter {
129128
) -> Result<Box<dyn FlightScatter>> {
130129
let hash_key = Evaluator::eval_physical_scalar(scalar)?;
131130

132-
let mut sip_hash = EvalNode::<ColumnID>::Function {
131+
let mut sip_hash = EvalNode::Function {
133132
args: vec![hash_key],
134133
func: FunctionFactory::instance().get("sipHash", &[&scalar.data_type()])?,
135134
};
@@ -146,7 +145,7 @@ impl OneHashKeyFlightScatter {
146145
Ok(Box::new(OneHashKeyFlightScatter {
147146
scatter_size,
148147
func_ctx,
149-
indices_scalar: EvalNode::<ColumnID>::Function {
148+
indices_scalar: EvalNode::Function {
150149
args: vec![sip_hash, EvalNode::Constant {
151150
value: DataValue::UInt64(scatter_size as u64),
152151
data_type: DataTypeImpl::UInt64(UInt64Type::new()),

src/query/service/src/evaluator/eval_context.rs

Lines changed: 0 additions & 101 deletions
This file was deleted.

src/query/service/src/evaluator/eval_node.rs

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,24 +12,22 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::fmt::Debug;
16-
15+
use common_datablocks::DataBlock;
1716
use common_datavalues::ColumnWithField;
1817
use common_datavalues::DataField;
1918
use common_datavalues::DataTypeImpl;
2019
use common_datavalues::DataValue;
20+
use common_datavalues::NullType;
2121
use common_exception::Result;
2222
use common_functions::scalars::Function;
2323
use common_functions::scalars::FunctionContext;
2424

25-
use crate::evaluator::eval_context::EmptyEvalContext;
26-
use crate::evaluator::EvalContext;
2725
use crate::evaluator::TypedVector;
2826

2927
/// A intermediate representation of a evaluable scalar expression, with configurable
3028
/// EvalContext.
3129
#[derive(Clone)]
32-
pub enum EvalNode<VectorID> {
30+
pub enum EvalNode {
3331
Function {
3432
func: Box<dyn Function>,
3533
args: Vec<Self>,
@@ -41,48 +39,66 @@ pub enum EvalNode<VectorID> {
4139
data_type: DataTypeImpl,
4240
},
4341
Variable {
44-
id: VectorID,
42+
name: String,
43+
},
44+
IndexedVariable {
45+
index: usize,
4546
},
4647
}
4748

48-
impl<VectorID> EvalNode<VectorID>
49-
where VectorID: PartialEq + Eq + Clone + Debug
50-
{
51-
/// Evaluate with given context, which is typically a `DataBlock`
52-
pub fn eval(
53-
&self,
54-
func_ctx: &FunctionContext,
55-
eval_ctx: &impl EvalContext<VectorID = VectorID>,
56-
) -> Result<TypedVector> {
49+
impl EvalNode {
50+
pub fn eval(&self, func_ctx: &FunctionContext, data_block: &DataBlock) -> Result<TypedVector> {
5751
match &self {
5852
EvalNode::Function { func, args } => {
5953
let args = args
6054
.iter()
6155
.map(|arg| {
62-
let vector = arg.eval(func_ctx, eval_ctx)?;
56+
let vector = arg.eval(func_ctx, data_block)?;
6357
Ok(ColumnWithField::new(
6458
vector.vector,
6559
DataField::new("", vector.logical_type),
6660
))
6761
})
6862
.collect::<Result<Vec<_>>>()?;
6963
Ok(TypedVector::new(
70-
func.eval(func_ctx.clone(), &args, eval_ctx.tuple_count())?,
64+
func.eval(func_ctx.clone(), &args, data_block.num_rows())?,
7165
func.return_type(),
7266
))
7367
}
7468
EvalNode::Constant { value, data_type } => {
75-
let vector = value.as_const_column(data_type, eval_ctx.tuple_count())?;
69+
let vector = value.as_const_column(data_type, data_block.num_rows())?;
7670
Ok(TypedVector::new(vector, data_type.clone()))
7771
}
78-
EvalNode::Variable { id } => eval_ctx.get_vector(id),
72+
EvalNode::Variable { name } => {
73+
let column = data_block.try_column_by_name(name)?;
74+
let data_type = data_block
75+
.schema()
76+
.field_with_name(name)?
77+
.data_type()
78+
.clone();
79+
Ok(TypedVector {
80+
vector: column.clone(),
81+
logical_type: data_type,
82+
})
83+
}
84+
EvalNode::IndexedVariable { index } => {
85+
let column = data_block.column(*index);
86+
let data_type = data_block.schema().field(*index).data_type().clone();
87+
Ok(TypedVector {
88+
vector: column.clone(),
89+
logical_type: data_type,
90+
})
91+
}
7992
}
8093
}
8194

8295
/// Try to evaluate as a constant expression
8396
pub fn try_eval_const(&self, func_ctx: &FunctionContext) -> Result<(DataValue, DataTypeImpl)> {
84-
let eval_ctx = EmptyEvalContext::<VectorID>::new();
85-
let vector = self.eval(func_ctx, &eval_ctx)?;
97+
let dummy_column = DataValue::Null.as_const_column(&NullType::new_impl(), 1)?;
98+
let mut dummy_data_block = DataBlock::empty();
99+
dummy_data_block = dummy_data_block
100+
.add_column(dummy_column, DataField::new("dummy", NullType::new_impl()))?;
101+
let vector = self.eval(func_ctx, &dummy_data_block)?;
86102
debug_assert!(vector.vector.len() == 1);
87103
Ok((vector.vector.get(0), vector.logical_type))
88104
}

src/query/service/src/evaluator/mod.rs

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,39 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
mod eval_context;
1615
mod eval_node;
1716
mod physical_scalar;
1817
mod scalar;
1918

20-
pub use eval_context::EvalContext;
21-
pub use eval_context::TypedVector;
19+
use common_datavalues::ColumnRef;
20+
use common_datavalues::DataTypeImpl;
2221
pub use eval_node::EvalNode;
2322

2423
pub struct Evaluator;
24+
25+
#[derive(Clone, Debug)]
26+
pub struct TypedVector {
27+
pub(super) vector: ColumnRef,
28+
pub(super) logical_type: DataTypeImpl,
29+
}
30+
31+
impl TypedVector {
32+
pub fn new(data: ColumnRef, logical_type: DataTypeImpl) -> Self {
33+
Self {
34+
vector: data,
35+
logical_type,
36+
}
37+
}
38+
39+
pub fn logical_type(&self) -> DataTypeImpl {
40+
self.logical_type.clone()
41+
}
42+
43+
pub fn physical_type(&self) -> DataTypeImpl {
44+
self.vector.data_type()
45+
}
46+
47+
pub fn vector(&self) -> &ColumnRef {
48+
&self.vector
49+
}
50+
}

src/query/service/src/evaluator/physical_scalar.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,23 +23,17 @@ use crate::evaluator::Evaluator;
2323
use crate::sql::executor::PhysicalScalar;
2424

2525
impl Evaluator {
26-
pub fn eval_physical_scalars<VectorID>(
27-
physical_scalars: &[PhysicalScalar],
28-
) -> Result<Vec<EvalNode<VectorID>>>
29-
where VectorID: From<String> {
26+
pub fn eval_physical_scalars(physical_scalars: &[PhysicalScalar]) -> Result<Vec<EvalNode>> {
3027
physical_scalars
3128
.iter()
32-
.map(Evaluator::eval_physical_scalar::<VectorID>)
29+
.map(Evaluator::eval_physical_scalar)
3330
.collect::<Result<_>>()
3431
}
3532

36-
pub fn eval_physical_scalar<VectorID>(
37-
physical_scalar: &PhysicalScalar,
38-
) -> Result<EvalNode<VectorID>>
39-
where VectorID: From<String> {
33+
pub fn eval_physical_scalar(physical_scalar: &PhysicalScalar) -> Result<EvalNode> {
4034
match physical_scalar {
4135
PhysicalScalar::Variable { column_id, .. } => Ok(EvalNode::Variable {
42-
id: column_id.clone().into(),
36+
name: column_id.clone(),
4337
}),
4438
PhysicalScalar::Constant { value, data_type } => Ok(EvalNode::Constant {
4539
value: value.clone(),
@@ -66,6 +60,9 @@ impl Evaluator {
6660
args: vec![Self::eval_physical_scalar(input)?],
6761
})
6862
}
63+
PhysicalScalar::IndexedVariable { index, .. } => {
64+
Ok(EvalNode::IndexedVariable { index: *index })
65+
}
6966
}
7067
}
7168
}

src/query/service/src/evaluator/scalar.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,10 @@ use crate::sql::plans::Scalar;
2525
use crate::sql::plans::ScalarExpr;
2626

2727
impl Evaluator {
28-
pub fn eval_scalar<VectorID>(scalar: &Scalar) -> Result<EvalNode<VectorID>>
29-
where VectorID: From<String> {
28+
pub fn eval_scalar(scalar: &Scalar) -> Result<EvalNode> {
3029
match scalar {
3130
Scalar::BoundColumnRef(column_ref) => Ok(EvalNode::Variable {
32-
id: column_ref.column.index.to_string().into(),
31+
name: column_ref.column.index.to_string(),
3332
}),
3433
Scalar::ConstantExpr(constant) => Ok(EvalNode::Constant {
3534
value: constant.value.clone(),
@@ -62,7 +61,7 @@ impl Evaluator {
6261
Ok(EvalNode::Function { func, args })
6362
}
6463
Scalar::FunctionCall(func) => {
65-
let args: Vec<EvalNode<VectorID>> = func
64+
let args: Vec<EvalNode> = func
6665
.arguments
6766
.iter()
6867
.map(Self::eval_scalar)

src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,15 @@ use parking_lot::RwLock;
1919
use crate::evaluator::EvalNode;
2020
use crate::evaluator::Evaluator;
2121
use crate::pipelines::processors::transforms::hash_join::MarkJoinDesc;
22-
use crate::sql::executor::ColumnID;
2322
use crate::sql::executor::HashJoin;
2423
use crate::sql::executor::PhysicalScalar;
2524
use crate::sql::plans::JoinType;
2625

2726
pub struct HashJoinDesc {
28-
pub(crate) build_keys: Vec<EvalNode<ColumnID>>,
29-
pub(crate) probe_keys: Vec<EvalNode<ColumnID>>,
27+
pub(crate) build_keys: Vec<EvalNode>,
28+
pub(crate) probe_keys: Vec<EvalNode>,
3029
pub(crate) join_type: JoinType,
31-
pub(crate) other_predicate: Option<EvalNode<ColumnID>>,
30+
pub(crate) other_predicate: Option<EvalNode>,
3231
pub(crate) marker_join_desc: MarkJoinDesc,
3332
/// Whether the Join are derived from correlated subquery.
3433
pub(crate) from_correlated_subquery: bool,

src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ use crate::evaluator::EvalNode;
4141
use crate::pipelines::processors::transforms::hash_join::join_hash_table::MarkerKind;
4242
use crate::pipelines::processors::transforms::hash_join::row::RowPtr;
4343
use crate::sessions::TableContext;
44-
use crate::sql::executor::ColumnID;
4544
use crate::sql::planner::plans::JoinType;
4645
use crate::sql::plans::JoinType::Mark;
4746

@@ -602,7 +601,7 @@ impl JoinHashTable {
602601
fn get_other_filters(
603602
&self,
604603
merged_block: &DataBlock,
605-
filter: &EvalNode<ColumnID>,
604+
filter: &EvalNode,
606605
) -> Result<(Option<Bitmap>, bool, bool)> {
607606
let func_ctx = self.ctx.try_get_function_context()?;
608607
// `predicate_column` contains a column, which is a boolean column.

0 commit comments

Comments
 (0)