@@ -1209,9 +1209,11 @@ impl PhysicalPlanner {
1209
1209
}
1210
1210
OpStruct :: Filter ( filter) => {
1211
1211
assert_eq ! ( children. len( ) , 1 ) ;
1212
+ dbg ! ( filter) ;
1212
1213
let ( scans, child) = self . create_plan ( & children[ 0 ] , inputs, partition_count) ?;
1213
1214
let predicate =
1214
1215
self . create_expr ( filter. predicate . as_ref ( ) . unwrap ( ) , child. schema ( ) ) ?;
1216
+ dbg ! ( & predicate) ;
1215
1217
1216
1218
let filter: Arc < dyn ExecutionPlan > =
1217
1219
match ( filter. wrap_child_in_copy_exec , filter. use_datafusion_filter ) {
@@ -1223,7 +1225,7 @@ impl PhysicalPlanner {
1223
1225
predicate,
1224
1226
Self :: wrap_in_copy_exec ( Arc :: clone ( & child. native_plan ) ) ,
1225
1227
) ?) ,
1226
- ( false , true ) => Arc :: new ( DataFusionFilterExec :: try_new (
1228
+ ( false , true ) => Arc :: new ( CometFilterExec :: try_new (
1227
1229
predicate,
1228
1230
Arc :: clone ( & child. native_plan ) ,
1229
1231
) ?) ,
@@ -1233,6 +1235,8 @@ impl PhysicalPlanner {
1233
1235
) ?) ,
1234
1236
} ;
1235
1237
1238
+ dbg ! ( & filter) ;
1239
+
1236
1240
Ok ( (
1237
1241
scans,
1238
1242
Arc :: new ( SparkPlan :: new ( spark_plan. plan_id , filter, vec ! [ child] ) ) ,
@@ -1363,6 +1367,7 @@ impl PhysicalPlanner {
1363
1367
) )
1364
1368
}
1365
1369
OpStruct :: NativeScan ( scan) => {
1370
+ dbg ! ( scan) ;
1366
1371
let data_schema = convert_spark_types_to_arrow_schema ( scan. data_schema . as_slice ( ) ) ;
1367
1372
let required_schema: SchemaRef =
1368
1373
convert_spark_types_to_arrow_schema ( scan. required_schema . as_slice ( ) ) ;
@@ -1493,6 +1498,7 @@ impl PhysicalPlanner {
1493
1498
// The `ScanExec` operator will take actual arrays from Spark during execution
1494
1499
let scan =
1495
1500
ScanExec :: new ( self . exec_context_id , input_source, & scan. source , data_types) ?;
1501
+
1496
1502
Ok ( (
1497
1503
vec ! [ scan. clone( ) ] ,
1498
1504
Arc :: new ( SparkPlan :: new ( spark_plan. plan_id , Arc :: new ( scan) , vec ! [ ] ) ) ,
0 commit comments