Skip to content

Commit 1fee5d2

Browse files
committed
add indexed variable in PhysicalScalar
1 parent 1a30d62 commit 1fee5d2

File tree

16 files changed

+105
-175
lines changed

16 files changed

+105
-175
lines changed

src/query/service/src/api/rpc/flight_scatter_hash_v2.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,12 @@ use crate::api::rpc::flight_scatter::FlightScatter;
2727
use crate::evaluator::EvalNode;
2828
use crate::evaluator::Evaluator;
2929
use crate::evaluator::TypedVector;
30-
use crate::sql::executor::ColumnID;
3130
use crate::sql::executor::PhysicalScalar;
3231

3332
#[derive(Clone)]
3433
pub struct HashFlightScatterV2 {
3534
func_ctx: FunctionContext,
36-
hash_keys: Vec<EvalNode<ColumnID>>,
35+
hash_keys: Vec<EvalNode>,
3736
hash_functions: Vec<Box<dyn Function>>,
3837
scatter_size: usize,
3938
}
@@ -118,7 +117,7 @@ impl HashFlightScatterV2 {
118117
struct OneHashKeyFlightScatter {
119118
scatter_size: usize,
120119
func_ctx: FunctionContext,
121-
indices_scalar: EvalNode<ColumnID>,
120+
indices_scalar: EvalNode,
122121
}
123122

124123
impl OneHashKeyFlightScatter {
@@ -129,7 +128,7 @@ impl OneHashKeyFlightScatter {
129128
) -> Result<Box<dyn FlightScatter>> {
130129
let hash_key = Evaluator::eval_physical_scalar(scalar)?;
131130

132-
let mut sip_hash = EvalNode::<ColumnID>::Function {
131+
let mut sip_hash = EvalNode::Function {
133132
args: vec![hash_key],
134133
func: FunctionFactory::instance().get("sipHash", &[&scalar.data_type()])?,
135134
};
@@ -146,7 +145,7 @@ impl OneHashKeyFlightScatter {
146145
Ok(Box::new(OneHashKeyFlightScatter {
147146
scatter_size,
148147
func_ctx,
149-
indices_scalar: EvalNode::<ColumnID>::Function {
148+
indices_scalar: EvalNode::Function {
150149
args: vec![sip_hash, EvalNode::Constant {
151150
value: DataValue::UInt64(scatter_size as u64),
152151
data_type: DataTypeImpl::UInt64(UInt64Type::new()),

src/query/service/src/evaluator/eval_context.rs

Lines changed: 0 additions & 101 deletions
This file was deleted.

src/query/service/src/evaluator/eval_node.rs

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::fmt::Debug;
16-
15+
use common_datablocks::DataBlock;
1716
use common_datavalues::ColumnWithField;
1817
use common_datavalues::DataField;
1918
use common_datavalues::DataTypeImpl;
@@ -22,14 +21,12 @@ use common_exception::Result;
2221
use common_functions::scalars::Function;
2322
use common_functions::scalars::FunctionContext;
2423

25-
use crate::evaluator::eval_context::EmptyEvalContext;
26-
use crate::evaluator::EvalContext;
2724
use crate::evaluator::TypedVector;
2825

2926
/// A intermediate representation of a evaluable scalar expression, with configurable
3027
/// EvalContext.
3128
#[derive(Clone)]
32-
pub enum EvalNode<VectorID> {
29+
pub enum EvalNode {
3330
Function {
3431
func: Box<dyn Function>,
3532
args: Vec<Self>,
@@ -41,48 +38,63 @@ pub enum EvalNode<VectorID> {
4138
data_type: DataTypeImpl,
4239
},
4340
Variable {
44-
id: VectorID,
41+
name: String,
42+
},
43+
IndexedVariable {
44+
index: usize,
4545
},
4646
}
4747

48-
impl<VectorID> EvalNode<VectorID>
49-
where VectorID: PartialEq + Eq + Clone + Debug
50-
{
51-
/// Evaluate with given context, which is typically a `DataBlock`
52-
pub fn eval(
53-
&self,
54-
func_ctx: &FunctionContext,
55-
eval_ctx: &impl EvalContext<VectorID = VectorID>,
56-
) -> Result<TypedVector> {
48+
impl EvalNode {
49+
pub fn eval(&self, func_ctx: &FunctionContext, data_block: &DataBlock) -> Result<TypedVector> {
5750
match &self {
5851
EvalNode::Function { func, args } => {
5952
let args = args
6053
.iter()
6154
.map(|arg| {
62-
let vector = arg.eval(func_ctx, eval_ctx)?;
55+
let vector = arg.eval(func_ctx, data_block)?;
6356
Ok(ColumnWithField::new(
6457
vector.vector,
6558
DataField::new("", vector.logical_type),
6659
))
6760
})
6861
.collect::<Result<Vec<_>>>()?;
6962
Ok(TypedVector::new(
70-
func.eval(func_ctx.clone(), &args, eval_ctx.tuple_count())?,
63+
func.eval(func_ctx.clone(), &args, data_block.num_rows())?,
7164
func.return_type(),
7265
))
7366
}
7467
EvalNode::Constant { value, data_type } => {
75-
let vector = value.as_const_column(data_type, eval_ctx.tuple_count())?;
68+
let vector = value.as_const_column(data_type, data_block.num_rows())?;
7669
Ok(TypedVector::new(vector, data_type.clone()))
7770
}
78-
EvalNode::Variable { id } => eval_ctx.get_vector(id),
71+
EvalNode::Variable { name } => {
72+
let column = data_block.try_column_by_name(name)?;
73+
let data_type = data_block
74+
.schema()
75+
.field_with_name(name)?
76+
.data_type()
77+
.clone();
78+
Ok(TypedVector {
79+
vector: column.clone(),
80+
logical_type: data_type,
81+
})
82+
}
83+
EvalNode::IndexedVariable { index } => {
84+
let column = data_block.column(*index);
85+
let data_type = data_block.schema().field(*index).data_type().clone();
86+
Ok(TypedVector {
87+
vector: column.clone(),
88+
logical_type: data_type,
89+
})
90+
}
7991
}
8092
}
8193

8294
/// Try to evaluate as a constant expression
8395
pub fn try_eval_const(&self, func_ctx: &FunctionContext) -> Result<(DataValue, DataTypeImpl)> {
84-
let eval_ctx = EmptyEvalContext::<VectorID>::new();
85-
let vector = self.eval(func_ctx, &eval_ctx)?;
96+
let dummy_data_block = DataBlock::empty();
97+
let vector = self.eval(func_ctx, &dummy_data_block)?;
8698
debug_assert!(vector.vector.len() == 1);
8799
Ok((vector.vector.get(0), vector.logical_type))
88100
}

src/query/service/src/evaluator/mod.rs

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,39 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
mod eval_context;
1615
mod eval_node;
1716
mod physical_scalar;
1817
mod scalar;
1918

20-
pub use eval_context::EvalContext;
21-
pub use eval_context::TypedVector;
19+
use common_datavalues::ColumnRef;
20+
use common_datavalues::DataTypeImpl;
2221
pub use eval_node::EvalNode;
2322

2423
pub struct Evaluator;
24+
25+
#[derive(Clone, Debug)]
26+
pub struct TypedVector {
27+
pub(super) vector: ColumnRef,
28+
pub(super) logical_type: DataTypeImpl,
29+
}
30+
31+
impl TypedVector {
32+
pub fn new(data: ColumnRef, logical_type: DataTypeImpl) -> Self {
33+
Self {
34+
vector: data,
35+
logical_type,
36+
}
37+
}
38+
39+
pub fn logical_type(&self) -> DataTypeImpl {
40+
self.logical_type.clone()
41+
}
42+
43+
pub fn physical_type(&self) -> DataTypeImpl {
44+
self.vector.data_type()
45+
}
46+
47+
pub fn vector(&self) -> &ColumnRef {
48+
&self.vector
49+
}
50+
}

src/query/service/src/evaluator/physical_scalar.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,23 +23,17 @@ use crate::evaluator::Evaluator;
2323
use crate::sql::executor::PhysicalScalar;
2424

2525
impl Evaluator {
26-
pub fn eval_physical_scalars<VectorID>(
27-
physical_scalars: &[PhysicalScalar],
28-
) -> Result<Vec<EvalNode<VectorID>>>
29-
where VectorID: From<String> {
26+
pub fn eval_physical_scalars(physical_scalars: &[PhysicalScalar]) -> Result<Vec<EvalNode>> {
3027
physical_scalars
3128
.iter()
32-
.map(Evaluator::eval_physical_scalar::<VectorID>)
29+
.map(Evaluator::eval_physical_scalar)
3330
.collect::<Result<_>>()
3431
}
3532

36-
pub fn eval_physical_scalar<VectorID>(
37-
physical_scalar: &PhysicalScalar,
38-
) -> Result<EvalNode<VectorID>>
39-
where VectorID: From<String> {
33+
pub fn eval_physical_scalar(physical_scalar: &PhysicalScalar) -> Result<EvalNode> {
4034
match physical_scalar {
4135
PhysicalScalar::Variable { column_id, .. } => Ok(EvalNode::Variable {
42-
id: column_id.clone().into(),
36+
name: column_id.clone(),
4337
}),
4438
PhysicalScalar::Constant { value, data_type } => Ok(EvalNode::Constant {
4539
value: value.clone(),
@@ -66,6 +60,9 @@ impl Evaluator {
6660
args: vec![Self::eval_physical_scalar(input)?],
6761
})
6862
}
63+
PhysicalScalar::IndexedVariable { index, .. } => {
64+
Ok(EvalNode::IndexedVariable { index: *index })
65+
}
6966
}
7067
}
7168
}

src/query/service/src/evaluator/scalar.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,10 @@ use crate::sql::plans::Scalar;
2525
use crate::sql::plans::ScalarExpr;
2626

2727
impl Evaluator {
28-
pub fn eval_scalar<VectorID>(scalar: &Scalar) -> Result<EvalNode<VectorID>>
29-
where VectorID: From<String> {
28+
pub fn eval_scalar(scalar: &Scalar) -> Result<EvalNode> {
3029
match scalar {
3130
Scalar::BoundColumnRef(column_ref) => Ok(EvalNode::Variable {
32-
id: column_ref.column.index.to_string().into(),
31+
name: column_ref.column.index.to_string(),
3332
}),
3433
Scalar::ConstantExpr(constant) => Ok(EvalNode::Constant {
3534
value: constant.value.clone(),
@@ -62,7 +61,7 @@ impl Evaluator {
6261
Ok(EvalNode::Function { func, args })
6362
}
6463
Scalar::FunctionCall(func) => {
65-
let args: Vec<EvalNode<VectorID>> = func
64+
let args: Vec<EvalNode> = func
6665
.arguments
6766
.iter()
6867
.map(Self::eval_scalar)

src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,15 @@ use parking_lot::RwLock;
1919
use crate::evaluator::EvalNode;
2020
use crate::evaluator::Evaluator;
2121
use crate::pipelines::processors::transforms::hash_join::MarkJoinDesc;
22-
use crate::sql::executor::ColumnID;
2322
use crate::sql::executor::HashJoin;
2423
use crate::sql::executor::PhysicalScalar;
2524
use crate::sql::plans::JoinType;
2625

2726
pub struct HashJoinDesc {
28-
pub(crate) build_keys: Vec<EvalNode<ColumnID>>,
29-
pub(crate) probe_keys: Vec<EvalNode<ColumnID>>,
27+
pub(crate) build_keys: Vec<EvalNode>,
28+
pub(crate) probe_keys: Vec<EvalNode>,
3029
pub(crate) join_type: JoinType,
31-
pub(crate) other_predicate: Option<EvalNode<ColumnID>>,
30+
pub(crate) other_predicate: Option<EvalNode>,
3231
pub(crate) marker_join_desc: MarkJoinDesc,
3332
/// Whether the Join are derived from correlated subquery.
3433
pub(crate) from_correlated_subquery: bool,

src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ use crate::evaluator::EvalNode;
4141
use crate::pipelines::processors::transforms::hash_join::join_hash_table::MarkerKind;
4242
use crate::pipelines::processors::transforms::hash_join::row::RowPtr;
4343
use crate::sessions::TableContext;
44-
use crate::sql::executor::ColumnID;
4544
use crate::sql::planner::plans::JoinType;
4645
use crate::sql::plans::JoinType::Mark;
4746

@@ -602,7 +601,7 @@ impl JoinHashTable {
602601
fn get_other_filters(
603602
&self,
604603
merged_block: &DataBlock,
605-
filter: &EvalNode<ColumnID>,
604+
filter: &EvalNode,
606605
) -> Result<(Option<Bitmap>, bool, bool)> {
607606
let func_ctx = self.ctx.try_get_function_context()?;
608607
// `predicate_column` contains a column, which is a boolean column.

0 commit comments

Comments
 (0)