@@ -20,6 +20,7 @@ use std::sync::Arc;
20
20
use crate :: parquet:: Unit :: Page ;
21
21
use crate :: parquet:: { ContextWithParquet , Scenario } ;
22
22
23
+ use arrow:: array:: RecordBatch ;
23
24
use datafusion:: datasource:: file_format:: parquet:: ParquetFormat ;
24
25
use datafusion:: datasource:: file_format:: FileFormat ;
25
26
use datafusion:: datasource:: listing:: PartitionedFile ;
@@ -40,7 +41,11 @@ use futures::StreamExt;
40
41
use object_store:: path:: Path ;
41
42
use object_store:: ObjectMeta ;
42
43
43
- async fn get_parquet_exec ( state : & SessionState , filter : Expr ) -> DataSourceExec {
44
+ async fn get_parquet_exec (
45
+ state : & SessionState ,
46
+ filter : Expr ,
47
+ pushdown_filters : bool ,
48
+ ) -> DataSourceExec {
44
49
let object_store_url = ObjectStoreUrl :: local_filesystem ( ) ;
45
50
let store = state. runtime_env ( ) . object_store ( & object_store_url) . unwrap ( ) ;
46
51
@@ -78,7 +83,8 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> DataSourceExec
78
83
let source = Arc :: new (
79
84
ParquetSource :: default ( )
80
85
. with_predicate ( predicate)
81
- . with_enable_page_index ( true ) ,
86
+ . with_enable_page_index ( true )
87
+ . with_pushdown_filters ( pushdown_filters) ,
82
88
) ;
83
89
let base_config = FileScanConfigBuilder :: new ( object_store_url, schema, source)
84
90
. with_file ( partitioned_file)
@@ -87,38 +93,44 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> DataSourceExec
87
93
DataSourceExec :: new ( Arc :: new ( base_config) )
88
94
}
89
95
96
+ async fn get_filter_results (
97
+ state : & SessionState ,
98
+ filter : Expr ,
99
+ pushdown_filters : bool ,
100
+ ) -> Vec < RecordBatch > {
101
+ let parquet_exec = get_parquet_exec ( state, filter, pushdown_filters) . await ;
102
+ let task_ctx = state. task_ctx ( ) ;
103
+ let mut results = parquet_exec. execute ( 0 , task_ctx. clone ( ) ) . unwrap ( ) ;
104
+ let mut batches = Vec :: new ( ) ;
105
+ while let Some ( Ok ( batch) ) = results. next ( ) . await {
106
+ batches. push ( batch) ;
107
+ }
108
+ batches
109
+ }
110
+
90
111
#[ tokio:: test]
91
112
async fn page_index_filter_one_col ( ) {
92
113
let session_ctx = SessionContext :: new ( ) ;
93
114
let state = session_ctx. state ( ) ;
94
- let task_ctx = state. task_ctx ( ) ;
95
115
96
116
// 1.create filter month == 1;
97
117
let filter = col ( "month" ) . eq ( lit ( 1_i32 ) ) ;
98
118
99
- let parquet_exec = get_parquet_exec ( & state, filter) . await ;
100
-
101
- let mut results = parquet_exec. execute ( 0 , task_ctx. clone ( ) ) . unwrap ( ) ;
102
-
103
- let batch = results. next ( ) . await . unwrap ( ) . unwrap ( ) ;
104
-
119
+ let batches = get_filter_results ( & state, filter. clone ( ) , false ) . await ;
105
120
// `month = 1` from the page index should create below RowSelection
106
121
// vec.push(RowSelector::select(312));
107
122
// vec.push(RowSelector::skip(3330));
108
123
// vec.push(RowSelector::select(339));
109
124
// vec.push(RowSelector::skip(3319));
110
125
// total 651 row
111
- assert_eq ! ( batch. num_rows( ) , 651 ) ;
126
+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 651 ) ;
127
+
128
+ let batches = get_filter_results ( & state, filter, true ) . await ;
129
+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 620 ) ;
112
130
113
131
// 2. create filter month == 1 or month == 2;
114
132
let filter = col ( "month" ) . eq ( lit ( 1_i32 ) ) . or ( col ( "month" ) . eq ( lit ( 2_i32 ) ) ) ;
115
-
116
- let parquet_exec = get_parquet_exec ( & state, filter) . await ;
117
-
118
- let mut results = parquet_exec. execute ( 0 , task_ctx. clone ( ) ) . unwrap ( ) ;
119
-
120
- let batch = results. next ( ) . await . unwrap ( ) . unwrap ( ) ;
121
-
133
+ let batches = get_filter_results ( & state, filter. clone ( ) , false ) . await ;
122
134
// `month = 1` or `month = 2` from the page index should create below RowSelection
123
135
// vec.push(RowSelector::select(312));
124
136
// vec.push(RowSelector::skip(900));
@@ -128,95 +140,78 @@ async fn page_index_filter_one_col() {
128
140
// vec.push(RowSelector::skip(873));
129
141
// vec.push(RowSelector::select(318));
130
142
// vec.push(RowSelector::skip(2128));
131
- assert_eq ! ( batch. num_rows( ) , 1281 ) ;
143
+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 1281 ) ;
144
+
145
+ let batches = get_filter_results ( & state, filter, true ) . await ;
146
+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 1180 ) ;
132
147
133
148
// 3. create filter month == 1 and month == 12;
134
149
let filter = col ( "month" )
135
150
. eq ( lit ( 1_i32 ) )
136
151
. and ( col ( "month" ) . eq ( lit ( 12_i32 ) ) ) ;
152
+ let batches = get_filter_results ( & state, filter. clone ( ) , false ) . await ;
153
+ assert ! ( batches. is_empty( ) ) ;
137
154
138
- let parquet_exec = get_parquet_exec ( & state, filter) . await ;
139
-
140
- let mut results = parquet_exec. execute ( 0 , task_ctx. clone ( ) ) . unwrap ( ) ;
141
-
142
- let batch = results. next ( ) . await ;
143
-
144
- assert ! ( batch. is_none( ) ) ;
155
+ let batches = get_filter_results ( & state, filter, true ) . await ;
156
+ assert ! ( batches. is_empty( ) ) ;
145
157
146
158
// 4.create filter 0 < month < 2 ;
147
159
let filter = col ( "month" ) . gt ( lit ( 0_i32 ) ) . and ( col ( "month" ) . lt ( lit ( 2_i32 ) ) ) ;
148
-
149
- let parquet_exec = get_parquet_exec ( & state, filter) . await ;
150
-
151
- let mut results = parquet_exec. execute ( 0 , task_ctx. clone ( ) ) . unwrap ( ) ;
152
-
153
- let batch = results. next ( ) . await . unwrap ( ) . unwrap ( ) ;
154
-
160
+ let batches = get_filter_results ( & state, filter. clone ( ) , false ) . await ;
155
161
// should same with `month = 1`
156
- assert_eq ! ( batch. num_rows( ) , 651 ) ;
157
-
158
- let session_ctx = SessionContext :: new ( ) ;
159
- let task_ctx = session_ctx. task_ctx ( ) ;
162
+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 651 ) ;
163
+ let batches = get_filter_results ( & state, filter, true ) . await ;
164
+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 620 ) ;
160
165
161
166
// 5.create filter date_string_col == "01/01/09"`;
162
167
// Note this test doesn't apply type coercion so the literal must match the actual view type
163
168
let filter = col ( "date_string_col" ) . eq ( lit ( ScalarValue :: new_utf8view ( "01/01/09" ) ) ) ;
164
- let parquet_exec = get_parquet_exec ( & state, filter) . await ;
165
- let mut results = parquet_exec. execute ( 0 , task_ctx. clone ( ) ) . unwrap ( ) ;
166
- let batch = results. next ( ) . await . unwrap ( ) . unwrap ( ) ;
169
+ let batches = get_filter_results ( & state, filter. clone ( ) , false ) . await ;
170
+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 14 ) ;
167
171
168
172
// there should only two pages match the filter
169
173
// min max
170
174
// page-20 0 01/01/09 01/02/09
171
175
// page-21 0 01/01/09 01/01/09
172
176
// each 7 rows
173
- assert_eq ! ( batch. num_rows( ) , 14 ) ;
177
+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 14 ) ;
178
+ let batches = get_filter_results ( & state, filter, true ) . await ;
179
+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 10 ) ;
174
180
}
175
181
176
182
#[ tokio:: test]
177
183
async fn page_index_filter_multi_col ( ) {
178
184
let session_ctx = SessionContext :: new ( ) ;
179
185
let state = session_ctx. state ( ) ;
180
- let task_ctx = session_ctx. task_ctx ( ) ;
181
186
182
187
// create filter month == 1 and year = 2009;
183
188
let filter = col ( "month" ) . eq ( lit ( 1_i32 ) ) . and ( col ( "year" ) . eq ( lit ( 2009 ) ) ) ;
184
-
185
- let parquet_exec = get_parquet_exec ( & state, filter) . await ;
186
-
187
- let mut results = parquet_exec. execute ( 0 , task_ctx. clone ( ) ) . unwrap ( ) ;
188
-
189
- let batch = results. next ( ) . await . unwrap ( ) . unwrap ( ) ;
190
-
189
+ let batches = get_filter_results ( & state, filter. clone ( ) , false ) . await ;
191
190
// `year = 2009` from the page index should create below RowSelection
192
191
// vec.push(RowSelector::select(3663));
193
192
// vec.push(RowSelector::skip(3642));
194
193
// combine with `month = 1` total 333 row
195
- assert_eq ! ( batch. num_rows( ) , 333 ) ;
194
+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 333 ) ;
195
+ let batches = get_filter_results ( & state, filter, true ) . await ;
196
+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 310 ) ;
196
197
197
198
// create filter (year = 2009 or id = 1) and month = 1;
198
199
// this should only use `month = 1` to evaluate the page index.
199
200
let filter = col ( "month" )
200
201
. eq ( lit ( 1_i32 ) )
201
202
. and ( col ( "year" ) . eq ( lit ( 2009 ) ) . or ( col ( "id" ) . eq ( lit ( 1 ) ) ) ) ;
202
-
203
- let parquet_exec = get_parquet_exec ( & state, filter) . await ;
204
-
205
- let mut results = parquet_exec. execute ( 0 , task_ctx. clone ( ) ) . unwrap ( ) ;
206
-
207
- let batch = results. next ( ) . await . unwrap ( ) . unwrap ( ) ;
208
- assert_eq ! ( batch. num_rows( ) , 651 ) ;
203
+ let batches = get_filter_results ( & state, filter. clone ( ) , false ) . await ;
204
+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 651 ) ;
205
+ let batches = get_filter_results ( & state, filter, true ) . await ;
206
+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 310 ) ;
209
207
210
208
// create filter (year = 2009 or id = 1)
211
209
// this filter use two columns will not push down
212
210
let filter = col ( "year" ) . eq ( lit ( 2009 ) ) . or ( col ( "id" ) . eq ( lit ( 1 ) ) ) ;
213
-
214
- let parquet_exec = get_parquet_exec ( & state, filter) . await ;
215
-
216
- let mut results = parquet_exec. execute ( 0 , task_ctx. clone ( ) ) . unwrap ( ) ;
217
-
218
- let batch = results. next ( ) . await . unwrap ( ) . unwrap ( ) ;
219
- assert_eq ! ( batch. num_rows( ) , 7300 ) ;
211
+ let batches = get_filter_results ( & state, filter. clone ( ) , false ) . await ;
212
+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 7300 ) ;
213
+ let batches = get_filter_results ( & state, filter, true ) . await ;
214
+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 3650 ) ;
220
215
221
216
// create filter (year = 2009 and id = 1) or (year = 2010)
222
217
// this filter use two columns will not push down
@@ -226,13 +221,10 @@ async fn page_index_filter_multi_col() {
226
221
. eq ( lit ( 2009 ) )
227
222
. and ( col ( "id" ) . eq ( lit ( 1 ) ) )
228
223
. or ( col ( "year" ) . eq ( lit ( 2010 ) ) ) ;
229
-
230
- let parquet_exec = get_parquet_exec ( & state, filter) . await ;
231
-
232
- let mut results = parquet_exec. execute ( 0 , task_ctx. clone ( ) ) . unwrap ( ) ;
233
-
234
- let batch = results. next ( ) . await . unwrap ( ) . unwrap ( ) ;
235
- assert_eq ! ( batch. num_rows( ) , 7300 ) ;
224
+ let batches = get_filter_results ( & state, filter. clone ( ) , false ) . await ;
225
+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 7300 ) ;
226
+ let batches = get_filter_results ( & state, filter, true ) . await ;
227
+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 3651 ) ;
236
228
}
237
229
238
230
async fn test_prune (
0 commit comments