@@ -20,7 +20,6 @@ use common_base::base::ProgressValues;
20
20
use common_catalog:: table_context:: TableContext ;
21
21
use common_datablocks:: DataBlock ;
22
22
use common_datavalues:: ColumnRef ;
23
- use common_datavalues:: DataSchemaRef ;
24
23
use common_datavalues:: DataSchemaRefExt ;
25
24
use common_exception:: ErrorCode ;
26
25
use common_exception:: Result ;
@@ -84,8 +83,6 @@ impl FuseTable {
84
83
) -> Result < ( ) > {
85
84
let table_schema = self . table_info . schema ( ) ;
86
85
let projection = self . projection_of_push_downs ( & plan. push_downs ) ;
87
- let output_schema = projection. project_schema ( & table_schema) ;
88
- let output_schema = Arc :: new ( output_schema) ;
89
86
let output_reader = self . create_block_reader ( & ctx, projection) ?; // for deserialize output blocks
90
87
91
88
let ( output_reader, prewhere_reader, prewhere_filter, remain_reader) =
@@ -139,7 +136,6 @@ impl FuseTable {
139
136
FuseTableSource :: create (
140
137
ctx. clone ( ) ,
141
138
output,
142
- output_schema. clone ( ) ,
143
139
output_reader. clone ( ) ,
144
140
prewhere_reader. clone ( ) ,
145
141
prewhere_filter. clone ( ) ,
@@ -174,7 +170,6 @@ struct FuseTableSource {
174
170
ctx : Arc < dyn TableContext > ,
175
171
scan_progress : Arc < Progress > ,
176
172
output : Arc < OutputPort > ,
177
- output_schema : DataSchemaRef ,
178
173
output_reader : Arc < BlockReader > ,
179
174
180
175
prewhere_reader : Arc < BlockReader > ,
@@ -186,7 +181,6 @@ impl FuseTableSource {
186
181
pub fn create (
187
182
ctx : Arc < dyn TableContext > ,
188
183
output : Arc < OutputPort > ,
189
- output_schema : DataSchemaRef ,
190
184
output_reader : Arc < BlockReader > ,
191
185
prewhere_reader : Arc < BlockReader > ,
192
186
prewhere_filter : Arc < Option < ExpressionExecutor > > ,
@@ -200,7 +194,6 @@ impl FuseTableSource {
200
194
output,
201
195
scan_progress,
202
196
state : State :: Finish ,
203
- output_schema,
204
197
output_reader,
205
198
prewhere_reader,
206
199
prewhere_filter,
@@ -211,7 +204,6 @@ impl FuseTableSource {
211
204
output,
212
205
scan_progress,
213
206
state : State :: ReadDataPrewhere ( partitions. remove ( 0 ) ) ,
214
- output_schema,
215
207
output_reader,
216
208
prewhere_reader,
217
209
prewhere_filter,
@@ -222,7 +214,8 @@ impl FuseTableSource {
222
214
223
215
fn generate_one_block ( & mut self , block : DataBlock ) -> Result < ( ) > {
224
216
let mut partitions = self . ctx . try_get_partitions ( 1 ) ?;
225
-
217
+ // resort and prune columns
218
+ let block = block. resort ( self . output_reader . schema ( ) ) ?;
226
219
self . state = match partitions. is_empty ( ) {
227
220
true => State :: Generated ( None , block) ,
228
221
false => State :: Generated ( Some ( partitions. remove ( 0 ) ) , block) ,
@@ -235,11 +228,11 @@ impl FuseTableSource {
235
228
self . state = match partitions. is_empty ( ) {
236
229
true => State :: Generated (
237
230
None ,
238
- DataBlock :: empty_with_schema ( self . output_schema . clone ( ) ) ,
231
+ DataBlock :: empty_with_schema ( self . output_reader . schema ( ) ) ,
239
232
) ,
240
233
false => State :: Generated (
241
234
Some ( partitions. remove ( 0 ) ) ,
242
- DataBlock :: empty_with_schema ( self . output_schema . clone ( ) ) ,
235
+ DataBlock :: empty_with_schema ( self . output_reader . schema ( ) ) ,
243
236
) ,
244
237
} ;
245
238
Ok ( ( ) )
@@ -312,7 +305,7 @@ impl Processor for FuseTableSource {
312
305
prewhere_blocks =
313
306
prewhere_blocks. add_column ( col. clone ( ) , field. clone ( ) ) ?;
314
307
}
315
- prewhere_blocks. resort ( self . output_reader . schema ( ) ) ?
308
+ prewhere_blocks
316
309
} else {
317
310
return Err ( ErrorCode :: LogicalError ( "It's a bug. Need remain reader" ) ) ;
318
311
} ;
0 commit comments