Skip to content

Commit 3036fa8

Browse files
authored
chore: Simplifying BlockMetaInfo Definitions (#16716)
* clean up Signed-off-by: coldWater <forsaken628@gmail.com> * boxed Signed-off-by: coldWater <forsaken628@gmail.com> --------- Signed-off-by: coldWater <forsaken628@gmail.com>
1 parent e3a3d70 commit 3036fa8

File tree

21 files changed

+98
-362
lines changed

21 files changed

+98
-362
lines changed

src/query/catalog/src/plan/agg_index.rs

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use std::fmt::Debug;
1616

17+
use databend_common_expression::local_block_meta_serde;
1718
use databend_common_expression::BlockMetaInfo;
1819
use databend_common_expression::BlockMetaInfoPtr;
1920
use databend_common_expression::RemoteExpr;
@@ -69,26 +70,10 @@ impl AggIndexMeta {
6970
}
7071
}
7172

72-
impl serde::Serialize for AggIndexMeta {
73-
fn serialize<S>(&self, _: S) -> std::result::Result<S::Ok, S::Error>
74-
where S: serde::Serializer {
75-
unimplemented!("Unimplemented serialize AggIndexMeta")
76-
}
77-
}
78-
79-
impl<'de> serde::Deserialize<'de> for AggIndexMeta {
80-
fn deserialize<D>(_: D) -> std::result::Result<Self, D::Error>
81-
where D: serde::Deserializer<'de> {
82-
unimplemented!("Unimplemented deserialize AggIndexMeta")
83-
}
84-
}
73+
local_block_meta_serde!(AggIndexMeta);
8574

8675
#[typetag::serde(name = "agg_index_meta")]
8776
impl BlockMetaInfo for AggIndexMeta {
88-
fn equals(&self, _: &Box<dyn BlockMetaInfo>) -> bool {
89-
unimplemented!("Unimplemented equals AggIndexMeta")
90-
}
91-
9277
fn clone_self(&self) -> Box<dyn BlockMetaInfo> {
9378
Box::new(self.clone())
9479
}

src/query/expression/src/block.rs

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -83,29 +83,37 @@ impl BlockEntry {
8383
#[typetag::serde(tag = "type")]
8484
pub trait BlockMetaInfo: Debug + Send + Sync + Any + 'static {
8585
#[allow(clippy::borrowed_box)]
86-
fn equals(&self, info: &Box<dyn BlockMetaInfo>) -> bool;
86+
fn equals(&self, _info: &Box<dyn BlockMetaInfo>) -> bool {
87+
panic!(
88+
"The reason for not implementing equals is usually because the higher-level logic doesn't allow/need the meta to be compared."
89+
)
90+
}
8791

88-
fn clone_self(&self) -> Box<dyn BlockMetaInfo>;
92+
fn clone_self(&self) -> Box<dyn BlockMetaInfo> {
93+
panic!(
94+
"The reason for not implementing clone_self is usually because the higher-level logic doesn't allow/need the associated block to be cloned."
95+
)
96+
}
8997
}
9098

91-
pub trait BlockMetaInfoDowncast: Sized {
92-
fn downcast_from(boxed: BlockMetaInfoPtr) -> Option<Self>;
93-
94-
fn downcast_ref_from(boxed: &BlockMetaInfoPtr) -> Option<&Self>;
95-
}
99+
pub trait BlockMetaInfoDowncast: Sized + BlockMetaInfo {
100+
fn boxed(self) -> BlockMetaInfoPtr {
101+
Box::new(self)
102+
}
96103

97-
impl<T: BlockMetaInfo> BlockMetaInfoDowncast for T {
98104
fn downcast_from(boxed: BlockMetaInfoPtr) -> Option<Self> {
99105
let boxed: Box<dyn Any> = boxed;
100106
boxed.downcast().ok().map(|x| *x)
101107
}
102108

103109
fn downcast_ref_from(boxed: &BlockMetaInfoPtr) -> Option<&Self> {
104-
let boxed: &dyn Any = boxed.as_ref();
110+
let boxed = boxed.as_ref() as &dyn Any;
105111
boxed.downcast_ref()
106112
}
107113
}
108114

115+
impl<T: BlockMetaInfo> BlockMetaInfoDowncast for T {}
116+
109117
impl DataBlock {
110118
#[inline]
111119
pub fn new(columns: Vec<BlockEntry>, num_rows: usize) -> Self {
@@ -633,8 +641,8 @@ impl Eq for Box<dyn BlockMetaInfo> {}
633641

634642
impl PartialEq for Box<dyn BlockMetaInfo> {
635643
fn eq(&self, other: &Self) -> bool {
636-
let this_any: &dyn Any = self.as_ref();
637-
let other_any: &dyn Any = other.as_ref();
644+
let this_any = self.as_ref() as &dyn Any;
645+
let other_any = other.as_ref() as &dyn Any;
638646

639647
match this_any.type_id() == other_any.type_id() {
640648
true => self.equals(other),
@@ -668,3 +676,28 @@ fn check_type(data_type: &DataType, value: &Value<AnyType>) {
668676
Value::Column(c) => assert_eq!(&c.data_type(), data_type),
669677
}
670678
}
679+
680+
#[macro_export]
681+
macro_rules! local_block_meta_serde {
682+
($T:ty) => {
683+
impl serde::Serialize for $T {
684+
fn serialize<S>(&self, _: S) -> std::result::Result<S::Ok, S::Error>
685+
where S: serde::Serializer {
686+
unreachable!(
687+
"{} must not be exchanged between multiple nodes.",
688+
stringify!($T)
689+
)
690+
}
691+
}
692+
693+
impl<'de> serde::Deserialize<'de> for $T {
694+
fn deserialize<D>(_: D) -> std::result::Result<Self, D::Error>
695+
where D: serde::Deserializer<'de> {
696+
unreachable!(
697+
"{} must not be exchanged between multiple nodes.",
698+
stringify!($T)
699+
)
700+
}
701+
}
702+
};
703+
}

src/query/pipeline/core/tests/it/pipelines/processors/port_test.rs

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,12 @@ use std::sync::Barrier;
1818
use databend_common_base::runtime::Thread;
1919
use databend_common_exception::ErrorCode;
2020
use databend_common_exception::Result;
21+
use databend_common_expression::local_block_meta_serde;
2122
use databend_common_expression::BlockMetaInfo;
2223
use databend_common_expression::DataBlock;
2324
use databend_common_pipeline_core::processors::connect;
2425
use databend_common_pipeline_core::processors::InputPort;
2526
use databend_common_pipeline_core::processors::OutputPort;
26-
use serde::Deserializer;
27-
use serde::Serializer;
2827

2928
#[derive(Clone, Debug)]
3029
struct TestDataMeta {
@@ -43,26 +42,10 @@ impl TestDataMeta {
4342
}
4443
}
4544

46-
impl serde::Serialize for TestDataMeta {
47-
fn serialize<S>(&self, _: S) -> std::result::Result<S::Ok, S::Error>
48-
where S: Serializer {
49-
unimplemented!("Serialize is unimplemented for TestDataMeta")
50-
}
51-
}
52-
53-
impl<'de> serde::Deserialize<'de> for TestDataMeta {
54-
fn deserialize<D>(_: D) -> std::result::Result<Self, D::Error>
55-
where D: Deserializer<'de> {
56-
unimplemented!("Deserialize is unimplemented for TestDataMeta")
57-
}
58-
}
45+
local_block_meta_serde!(TestDataMeta);
5946

6047
#[typetag::serde(name = "test_data_meta")]
6148
impl BlockMetaInfo for TestDataMeta {
62-
fn equals(&self, _: &Box<dyn BlockMetaInfo>) -> bool {
63-
unimplemented!("equals is unimplemented for TestDataMeta")
64-
}
65-
6649
fn clone_self(&self) -> Box<dyn BlockMetaInfo> {
6750
Box::new(self.clone())
6851
}

src/query/pipeline/transforms/src/processors/transforms/sort/k_way_merge_sort_partition.rs

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@ use std::sync::Arc;
1818

1919
use databend_common_exception::Result;
2020
use databend_common_expression::BlockMetaInfo;
21-
use databend_common_expression::BlockMetaInfoDowncast;
22-
use databend_common_expression::BlockMetaInfoPtr;
2321
use databend_common_expression::DataBlock;
2422
use databend_common_expression::DataSchemaRef;
2523
use databend_common_expression::SortColumnDescription;
@@ -240,19 +238,5 @@ pub struct SortTaskMeta {
240238
pub input: usize,
241239
}
242240

243-
impl SortTaskMeta {
244-
pub fn as_meta(self) -> BlockMetaInfoPtr {
245-
Box::new(self)
246-
}
247-
}
248-
249241
#[typetag::serde(name = "sort_task")]
250-
impl BlockMetaInfo for SortTaskMeta {
251-
fn equals(&self, info: &Box<dyn BlockMetaInfo>) -> bool {
252-
SortTaskMeta::downcast_ref_from(info).map_or(false, |info| self == info)
253-
}
254-
255-
fn clone_self(&self) -> Box<dyn BlockMetaInfo> {
256-
Box::new(*self)
257-
}
258-
}
242+
impl BlockMetaInfo for SortTaskMeta {}

src/query/pipeline/transforms/src/processors/transforms/sort/spill.rs

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,36 +12,25 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use databend_common_expression::local_block_meta_serde;
1516
use databend_common_expression::BlockMetaInfo;
1617

1718
use super::super::SortSpillParams;
1819

1920
/// Mark a partially sorted [`DataBlock`] as a block needs to be spilled.
20-
#[derive(serde::Serialize, serde::Deserialize, Debug)]
21+
#[derive(Debug)]
2122
pub struct SortSpillMetaWithParams(pub SortSpillParams);
2223

23-
#[typetag::serde(name = "sort_spill")]
24-
impl BlockMetaInfo for SortSpillMetaWithParams {
25-
fn equals(&self, _: &Box<dyn BlockMetaInfo>) -> bool {
26-
unimplemented!("Unimplemented equals SortSpillMetaWithParams")
27-
}
24+
local_block_meta_serde!(SortSpillMetaWithParams);
2825

29-
fn clone_self(&self) -> Box<dyn BlockMetaInfo> {
30-
unimplemented!("Unimplemented clone SortSpillMetaWithParams")
31-
}
32-
}
26+
#[typetag::serde(name = "sort_spill")]
27+
impl BlockMetaInfo for SortSpillMetaWithParams {}
3328

3429
/// Mark a partially sorted [`DataBlock`] as a block needs to be spilled.
35-
#[derive(serde::Serialize, serde::Deserialize, Debug)]
30+
#[derive(Debug)]
3631
pub struct SortSpillMeta {}
3732

33+
local_block_meta_serde!(SortSpillMeta);
34+
3835
#[typetag::serde(name = "sort_spill")]
39-
impl BlockMetaInfo for SortSpillMeta {
40-
fn equals(&self, _: &Box<dyn BlockMetaInfo>) -> bool {
41-
unimplemented!("Unimplemented equals SortSpillMeta")
42-
}
43-
44-
fn clone_self(&self) -> Box<dyn BlockMetaInfo> {
45-
unimplemented!("Unimplemented clone SortSpillMeta")
46-
}
47-
}
36+
impl BlockMetaInfo for SortSpillMeta {}

src/query/pipeline/transforms/src/processors/transforms/transform_compact_block.rs

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::sync::Arc;
2121

2222
use databend_common_exception::ErrorCode;
2323
use databend_common_exception::Result;
24+
use databend_common_expression::local_block_meta_serde;
2425
use databend_common_expression::BlockMetaInfo;
2526
use databend_common_expression::DataBlock;
2627

@@ -42,30 +43,10 @@ impl Debug for BlockCompactMeta {
4243
}
4344
}
4445

45-
impl serde::Serialize for BlockCompactMeta {
46-
fn serialize<S>(&self, _: S) -> std::result::Result<S::Ok, S::Error>
47-
where S: serde::Serializer {
48-
unimplemented!("Unimplemented serialize BlockCompactMeta")
49-
}
50-
}
51-
52-
impl<'de> serde::Deserialize<'de> for BlockCompactMeta {
53-
fn deserialize<D>(_: D) -> std::result::Result<Self, D::Error>
54-
where D: serde::Deserializer<'de> {
55-
unimplemented!("Unimplemented deserialize BlockCompactMeta")
56-
}
57-
}
46+
local_block_meta_serde!(BlockCompactMeta);
5847

5948
#[typetag::serde(name = "block_compact")]
60-
impl BlockMetaInfo for BlockCompactMeta {
61-
fn equals(&self, _: &Box<dyn BlockMetaInfo>) -> bool {
62-
unimplemented!("Unimplemented equals BlockCompactMeta")
63-
}
64-
65-
fn clone_self(&self) -> Box<dyn BlockMetaInfo> {
66-
unimplemented!("Unimplemented clone BlockCompactMeta")
67-
}
68-
}
49+
impl BlockMetaInfo for BlockCompactMeta {}
6950

7051
#[derive(Default)]
7152
pub struct TransformCompactBlock {

src/query/pipeline/transforms/src/processors/transforms/transform_k_way_merge_sort.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -532,7 +532,7 @@ where A: SortAlgorithm + 'static
532532
total: task.total,
533533
input: 0,
534534
}
535-
.as_meta();
535+
.boxed();
536536
self.output_data.push_back(block.add_meta(Some(meta))?);
537537
}
538538
debug_assert_eq!(rows, task.total);

src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -246,12 +246,4 @@ impl<Method: HashMethodBounds, V: Send + Sync + 'static> BlockMetaInfo
246246
fn typetag_name(&self) -> &'static str {
247247
unimplemented!("AggregateMeta does not support exchanging between multiple nodes")
248248
}
249-
250-
fn equals(&self, _: &Box<dyn BlockMetaInfo>) -> bool {
251-
unimplemented!("Unimplemented equals for AggregateMeta")
252-
}
253-
254-
fn clone_self(&self) -> Box<dyn BlockMetaInfo> {
255-
unimplemented!("Unimplemented clone for AggregateMeta")
256-
}
257249
}

src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use databend_common_base::runtime::profile::ProfileStatisticsName;
2626
use databend_common_catalog::table_context::TableContext;
2727
use databend_common_exception::Result;
2828
use databend_common_expression::arrow::serialize_column;
29+
use databend_common_expression::local_block_meta_serde;
2930
use databend_common_expression::types::ArgType;
3031
use databend_common_expression::types::ArrayType;
3132
use databend_common_expression::types::Int64Type;
@@ -140,30 +141,10 @@ impl Debug for FlightSerializedMeta {
140141
}
141142
}
142143

143-
impl serde::Serialize for FlightSerializedMeta {
144-
fn serialize<S>(&self, _: S) -> std::result::Result<S::Ok, S::Error>
145-
where S: serde::Serializer {
146-
unimplemented!("Unimplemented serialize FlightSerializedMeta")
147-
}
148-
}
149-
150-
impl<'de> serde::Deserialize<'de> for FlightSerializedMeta {
151-
fn deserialize<D>(_: D) -> std::result::Result<Self, D::Error>
152-
where D: serde::Deserializer<'de> {
153-
unimplemented!("Unimplemented deserialize FlightSerializedMeta")
154-
}
155-
}
144+
local_block_meta_serde!(FlightSerializedMeta);
156145

157146
#[typetag::serde(name = "exchange_shuffle")]
158-
impl BlockMetaInfo for FlightSerializedMeta {
159-
fn equals(&self, _: &Box<dyn BlockMetaInfo>) -> bool {
160-
unimplemented!("Unimplemented equals FlightSerializedMeta")
161-
}
162-
163-
fn clone_self(&self) -> Box<dyn BlockMetaInfo> {
164-
unimplemented!("Unimplemented clone FlightSerializedMeta")
165-
}
166-
}
147+
impl BlockMetaInfo for FlightSerializedMeta {}
167148

168149
impl<Method: HashMethodBounds> BlockMetaTransform<ExchangeShuffleMeta>
169150
for TransformExchangeGroupBySerializer<Method>

src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_meta.rs

Lines changed: 4 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use std::fmt::Debug;
1616
use std::fmt::Formatter;
1717

18+
use databend_common_expression::local_block_meta_serde;
1819
use databend_common_expression::BlockMetaInfo;
1920
use databend_common_expression::BlockMetaInfoPtr;
2021
use databend_common_expression::DataBlock;
@@ -30,40 +31,13 @@ impl WindowPartitionMeta {
3031
}
3132
}
3233

33-
impl serde::Serialize for WindowPartitionMeta {
34-
fn serialize<S>(&self, _: S) -> Result<S::Ok, S::Error>
35-
where S: serde::Serializer {
36-
unreachable!("WindowPartitionMeta does not support exchanging between multiple nodes")
37-
}
38-
}
39-
40-
impl<'de> serde::Deserialize<'de> for WindowPartitionMeta {
41-
fn deserialize<D>(_: D) -> Result<Self, D::Error>
42-
where D: serde::Deserializer<'de> {
43-
unreachable!("WindowPartitionMeta does not support exchanging between multiple nodes")
44-
}
45-
}
34+
local_block_meta_serde!(WindowPartitionMeta);
4635

4736
impl Debug for WindowPartitionMeta {
4837
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
4938
f.debug_struct("WindowPartitionMeta").finish()
5039
}
5140
}
5241

53-
impl BlockMetaInfo for WindowPartitionMeta {
54-
fn typetag_deserialize(&self) {
55-
unimplemented!("WindowPartitionMeta does not support exchanging between multiple nodes")
56-
}
57-
58-
fn typetag_name(&self) -> &'static str {
59-
unimplemented!("WindowPartitionMeta does not support exchanging between multiple nodes")
60-
}
61-
62-
fn equals(&self, _: &Box<dyn BlockMetaInfo>) -> bool {
63-
unimplemented!("Unimplemented equals for WindowPartitionMeta")
64-
}
65-
66-
fn clone_self(&self) -> Box<dyn BlockMetaInfo> {
67-
unimplemented!("Unimplemented clone for WindowPartitionMeta")
68-
}
69-
}
42+
#[typetag::serde(name = "window_partition")]
43+
impl BlockMetaInfo for WindowPartitionMeta {}

0 commit comments

Comments
 (0)