@@ -18,7 +18,9 @@ use std::sync::Arc;
18
18
use common_datablocks:: DataBlock ;
19
19
use common_exception:: Result ;
20
20
use common_meta_app:: schema:: TableInfo ;
21
+ use common_pipeline_sources:: processors:: sources:: EmptySource ;
21
22
use common_planners:: Extras ;
23
+ use common_planners:: PartInfo ;
22
24
use common_planners:: Partitions ;
23
25
use common_planners:: ReadDataSourcePlan ;
24
26
use common_planners:: Statistics ;
@@ -34,6 +36,23 @@ use crate::pipelines::Pipeline;
34
36
use crate :: sessions:: TableContext ;
35
37
use crate :: storages:: Table ;
36
38
39
+ #[ derive( serde:: Serialize , serde:: Deserialize , PartialEq , Eq ) ]
40
+ pub struct SystemTablePart ;
41
+
42
+ #[ typetag:: serde( name = "system" ) ]
43
+ impl PartInfo for SystemTablePart {
44
+ fn as_any ( & self ) -> & dyn Any {
45
+ self
46
+ }
47
+
48
+ fn equals ( & self , info : & Box < dyn PartInfo > ) -> bool {
49
+ match info. as_any ( ) . downcast_ref :: < SystemTablePart > ( ) {
50
+ None => false ,
51
+ Some ( other) => self == other,
52
+ }
53
+ }
54
+ }
55
+
37
56
pub trait SyncSystemTable : Send + Sync {
38
57
const NAME : & ' static str ;
39
58
@@ -45,7 +64,9 @@ pub trait SyncSystemTable: Send + Sync {
45
64
_ctx : Arc < dyn TableContext > ,
46
65
_push_downs : Option < Extras > ,
47
66
) -> Result < ( Statistics , Partitions ) > {
48
- Ok ( ( Statistics :: default ( ) , vec ! [ ] ) )
67
+ Ok ( ( Statistics :: default ( ) , vec ! [ Arc :: new( Box :: new(
68
+ SystemTablePart ,
69
+ ) ) ] ) )
49
70
}
50
71
}
51
72
@@ -84,9 +105,21 @@ impl<TTable: 'static + SyncSystemTable> Table for SyncOneBlockSystemTable<TTable
84
105
fn read2 (
85
106
& self ,
86
107
ctx : Arc < dyn TableContext > ,
87
- _ : & ReadDataSourcePlan ,
108
+ plan : & ReadDataSourcePlan ,
88
109
pipeline : & mut Pipeline ,
89
110
) -> Result < ( ) > {
111
+ // avoid duplicate read in cluster mode.
112
+ if plan. parts . is_empty ( ) {
113
+ let output = OutputPort :: create ( ) ;
114
+ pipeline. add_pipe ( Pipe :: SimplePipe {
115
+ inputs_port : vec ! [ ] ,
116
+ outputs_port : vec ! [ output. clone( ) ] ,
117
+ processors : vec ! [ EmptySource :: create( output) ?] ,
118
+ } ) ;
119
+
120
+ return Ok ( ( ) ) ;
121
+ }
122
+
90
123
let output = OutputPort :: create ( ) ;
91
124
let inner_table = self . inner_table . clone ( ) ;
92
125
pipeline. add_pipe ( Pipe :: SimplePipe {
@@ -150,7 +183,9 @@ pub trait AsyncSystemTable: Send + Sync {
150
183
_ctx : Arc < dyn TableContext > ,
151
184
_push_downs : Option < Extras > ,
152
185
) -> Result < ( Statistics , Partitions ) > {
153
- Ok ( ( Statistics :: default ( ) , vec ! [ ] ) )
186
+ Ok ( ( Statistics :: default ( ) , vec ! [ Arc :: new( Box :: new(
187
+ SystemTablePart ,
188
+ ) ) ] ) )
154
189
}
155
190
}
156
191
0 commit comments