@@ -154,11 +154,16 @@ impl FuseTable {
154
154
155
155
type DataChunks = Vec < ( usize , Vec < u8 > ) > ;
156
156
157
+ struct PrewhereData {
158
+ data_block : DataBlock ,
159
+ filter : ColumnRef ,
160
+ }
161
+
157
162
enum State {
158
163
ReadDataPrewhere ( PartInfoPtr ) ,
159
- ReadDataRemain ( PartInfoPtr , DataChunks , ColumnRef ) ,
164
+ ReadDataRemain ( PartInfoPtr , PrewhereData ) ,
160
165
PrewhereFilter ( PartInfoPtr , DataChunks ) ,
161
- Deserialize ( PartInfoPtr , DataChunks , Option < ColumnRef > ) ,
166
+ Deserialize ( PartInfoPtr , DataChunks , Option < PrewhereData > ) ,
162
167
Generated ( Option < PartInfoPtr > , DataBlock ) ,
163
168
Finish ,
164
169
}
@@ -285,7 +290,7 @@ impl Processor for FuseTableSource {
285
290
match self . state {
286
291
State :: Finish => Ok ( Event :: Finished ) ,
287
292
State :: ReadDataPrewhere ( _) => Ok ( Event :: Async ) ,
288
- State :: ReadDataRemain ( _, _, _ ) => Ok ( Event :: Async ) ,
293
+ State :: ReadDataRemain ( _, _) => Ok ( Event :: Async ) ,
289
294
State :: PrewhereFilter ( _, _) => Ok ( Event :: Sync ) ,
290
295
State :: Deserialize ( _, _, _) => Ok ( Event :: Sync ) ,
291
296
State :: Generated ( _, _) => Err ( ErrorCode :: LogicalError ( "It's a bug." ) ) ,
@@ -294,9 +299,28 @@ impl Processor for FuseTableSource {
294
299
295
300
fn process ( & mut self ) -> Result < ( ) > {
296
301
match std:: mem:: replace ( & mut self . state , State :: Finish ) {
297
- State :: Deserialize ( part, chunks, filter) => {
298
- let data_block = if let Some ( filter) = filter {
299
- let block = self . output_reader . deserialize ( part, chunks) ?;
302
+ State :: Deserialize ( part, chunks, prewhere_data) => {
303
+ let data_block = if let Some ( PrewhereData {
304
+ data_block : mut prewhere_blocks,
305
+ filter,
306
+ } ) = prewhere_data
307
+ {
308
+ let block = if chunks. is_empty ( ) {
309
+ prewhere_blocks
310
+ } else if let Some ( remain_reader) = self . remain_reader . as_ref ( ) {
311
+ let remain_block = remain_reader. deserialize ( part, chunks) ?;
312
+ for ( col, field) in remain_block
313
+ . columns ( )
314
+ . iter ( )
315
+ . zip ( remain_block. schema ( ) . fields ( ) )
316
+ {
317
+ prewhere_blocks =
318
+ prewhere_blocks. add_column ( col. clone ( ) , field. clone ( ) ) ?;
319
+ }
320
+ prewhere_blocks. resort ( self . output_reader . schema ( ) ) ?
321
+ } else {
322
+ return Err ( ErrorCode :: LogicalError ( "It's a bug. Need remain reader" ) ) ;
323
+ } ;
300
324
// the last step of prewhere
301
325
DataBlock :: filter_block ( block, & filter) ?
302
326
} else {
@@ -308,12 +332,10 @@ impl Processor for FuseTableSource {
308
332
}
309
333
State :: PrewhereFilter ( part, chunks) => {
310
334
// deserialize prewhere data block first
311
- let block = self
312
- . prewhere_reader
313
- . deserialize ( part. clone ( ) , chunks. clone ( ) ) ?;
335
+ let data_block = self . prewhere_reader . deserialize ( part. clone ( ) , chunks) ?;
314
336
if let Some ( filter) = self . prewhere_filter . as_ref ( ) {
315
337
// do filter
316
- let res = filter. execute ( & block ) ?;
338
+ let res = filter. execute ( & data_block ) ?;
317
339
let filter = DataBlock :: cast_to_nonull_boolean ( res. column ( 0 ) ) ?;
318
340
// shortcut, if predicates is const boolean (or can be cast to boolean)
319
341
if !DataBlock :: filter_exists ( & filter) ? {
@@ -324,10 +346,11 @@ impl Processor for FuseTableSource {
324
346
}
325
347
if self . remain_reader . is_none ( ) {
326
348
// shortcut, we don't need to read remain data
327
- let block = DataBlock :: filter_block ( block , & filter) ?;
349
+ let block = DataBlock :: filter_block ( data_block , & filter) ?;
328
350
self . generate_one_block ( block) ?;
329
351
} else {
330
- self . state = State :: ReadDataRemain ( part, chunks, filter) ;
352
+ self . state =
353
+ State :: ReadDataRemain ( part, PrewhereData { data_block, filter } ) ;
331
354
}
332
355
Ok ( ( ) )
333
356
} else {
@@ -353,12 +376,10 @@ impl Processor for FuseTableSource {
353
376
}
354
377
Ok ( ( ) )
355
378
}
356
- State :: ReadDataRemain ( part, prewhere_chunks , filter ) => {
379
+ State :: ReadDataRemain ( part, prewhere_data ) => {
357
380
if let Some ( remain_reader) = self . remain_reader . as_ref ( ) {
358
- let mut chunks = remain_reader. read_columns_data ( part. clone ( ) ) . await ?;
359
- // merge two parts of chunks
360
- chunks. extend ( prewhere_chunks) ;
361
- self . state = State :: Deserialize ( part, chunks, Some ( filter) ) ;
381
+ let chunks = remain_reader. read_columns_data ( part. clone ( ) ) . await ?;
382
+ self . state = State :: Deserialize ( part, chunks, Some ( prewhere_data) ) ;
362
383
Ok ( ( ) )
363
384
} else {
364
385
return Err ( ErrorCode :: LogicalError ( "It's a bug. No remain reader" ) ) ;
0 commit comments