diff --git a/Cargo.lock b/Cargo.lock index cc5b62125a8..1b4bf0aac3b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -378,6 +378,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73a47aa0c771b5381de2b7f16998d351a6f4eb839f1e13d48353e17e873d969b" dependencies = [ "bitflags", + "serde", + "serde_json", ] [[package]] @@ -1521,9 +1523,9 @@ dependencies = [ [[package]] name = "datafusion" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffe060b978f74ab446be722adb8a274e052e005bf6dfd171caadc3abaad10080" +checksum = "cc6cb8c2c81eada072059983657d6c9caf3fddefc43b4a65551d243253254a96" dependencies = [ "arrow", "arrow-ipc", @@ -1546,7 +1548,6 @@ dependencies = [ "datafusion-functions-aggregate", "datafusion-functions-table", "datafusion-functions-window", - "datafusion-macros", "datafusion-optimizer", "datafusion-physical-expr", "datafusion-physical-expr-common", @@ -1560,7 +1561,7 @@ dependencies = [ "object_store", "parking_lot", "parquet", - "rand 0.8.5", + "rand 0.9.1", "regex", "sqlparser", "tempfile", @@ -1571,9 +1572,9 @@ dependencies = [ [[package]] name = "datafusion-catalog" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61fe34f401bd03724a1f96d12108144f8cd495a3cdda2bf5e091822fb80b7e66" +checksum = "b7be8d1b627843af62e447396db08fe1372d882c0eb8d0ea655fd1fbc33120ee" dependencies = [ "arrow", "async-trait", @@ -1597,9 +1598,9 @@ dependencies = [ [[package]] name = "datafusion-catalog-listing" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4411b8e3bce5e0fc7521e44f201def2e2d5d1b5f176fb56e8cdc9942c890f00" +checksum = "38ab16c5ae43f65ee525fc493ceffbc41f40dee38b01f643dfcfc12959e92038" dependencies = [ "arrow", "async-trait", @@ -1620,9 +1621,9 @@ dependencies = [ [[package]] name = "datafusion-common" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0734015d81c8375eb5d4869b7f7ecccc2ee8d6cb81948ef737cd0e7b743bd69c" +checksum = "d3d56b2ac9f476b93ca82e4ef5fb00769c8a3f248d12b4965af7e27635fa7e12" dependencies = [ "ahash 0.8.12", "arrow", @@ -1643,9 +1644,9 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5167bb1d2ccbb87c6bc36c295274d7a0519b14afcfdaf401d53cbcaa4ef4968b" +checksum = "16015071202d6133bc84d72756176467e3e46029f3ce9ad2cb788f9b1ff139b2" dependencies = [ "futures", "log", @@ -1654,9 +1655,9 @@ dependencies = [ [[package]] name = "datafusion-datasource" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04e602dcdf2f50c2abf297cc2203c73531e6f48b29516af7695d338cf2a778b1" +checksum = "b77523c95c89d2a7eb99df14ed31390e04ab29b43ff793e562bdc1716b07e17b" dependencies = [ "arrow", "async-trait", @@ -1676,7 +1677,7 @@ dependencies = [ "log", "object_store", "parquet", - "rand 0.8.5", + "rand 0.9.1", "tempfile", "tokio", "url", @@ -1684,9 +1685,9 @@ dependencies = [ [[package]] name = "datafusion-datasource-csv" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3bb2253952dc32296ed5b84077cb2e0257fea4be6373e1c376426e17ead4ef6" +checksum = "40d25c5e2c0ebe8434beeea997b8e88d55b3ccc0d19344293f2373f65bc524fc" dependencies = [ "arrow", "async-trait", @@ -1709,9 +1710,9 @@ dependencies = [ [[package]] name = "datafusion-datasource-json" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b8c7f47a5d2fe03bfa521ec9bafdb8a5c82de8377f60967c3663f00c8790352" +checksum = "3dc6959e1155741ab35369e1dc7673ba30fc45ed568fad34c01b7cb1daeb4d4c" dependencies = [ "arrow", "async-trait", @@ -1734,9 +1735,9 @@ dependencies = [ [[package]] name = "datafusion-datasource-parquet" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27d15868ea39ed2dc266728b554f6304acd473de2142281ecfa1294bb7415923" +checksum = "b7a6afdfe358d70f4237f60eaef26ae5a1ce7cb2c469d02d5fc6c7fd5d84e58b" dependencies = [ "arrow", "async-trait", @@ -1759,21 +1760,21 @@ dependencies = [ "object_store", "parking_lot", "parquet", - "rand 0.8.5", + "rand 0.9.1", "tokio", ] [[package]] name = "datafusion-doc" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a91f8c2c5788ef32f48ff56c68e5b545527b744822a284373ac79bba1ba47292" +checksum = "9bcd8a3e3e3d02ea642541be23d44376b5d5c37c2938cce39b3873cdf7186eea" [[package]] name = "datafusion-execution" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06f004d100f49a3658c9da6fb0c3a9b760062d96cd4ad82ccc3b7b69a9fb2f84" +checksum = "670da1d45d045eee4c2319b8c7ea57b26cf48ab77b630aaa50b779e406da476a" dependencies = [ "arrow", "dashmap", @@ -1783,16 +1784,16 @@ dependencies = [ "log", "object_store", "parking_lot", - "rand 0.8.5", + "rand 0.9.1", "tempfile", "url", ] [[package]] name = "datafusion-expr" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a4e4ce3802609be38eeb607ee72f6fe86c3091460de9dbfae9e18db423b3964" +checksum = "b3a577f64bdb7e2cc4043cd97f8901d8c504711fde2dbcb0887645b00d7c660b" dependencies = [ "arrow", "chrono", @@ -1810,9 +1811,9 @@ dependencies = [ [[package]] name = "datafusion-expr-common" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "422ac9cf3b22bbbae8cdf8ceb33039107fde1b5492693168f13bd566b1bcc839" +checksum = "51b7916806ace3e9f41884f230f7f38ebf0e955dfbd88266da1826f29a0b9a6a" dependencies = [ "arrow", "datafusion-common", @@ -1823,9 +1824,9 @@ dependencies = [ [[package]] name = "datafusion-functions" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ddf0a0a2db5d2918349c978d42d80926c6aa2459cd8a3c533a84ec4bb63479e" +checksum = "7fb31c9dc73d3e0c365063f91139dc273308f8a8e124adda9898db8085d68357" dependencies = [ "arrow", "arrow-buffer", @@ -1840,7 +1841,7 @@ dependencies = [ "hex", "itertools 0.14.0", "log", - "rand 0.8.5", + "rand 0.9.1", "regex", "unicode-segmentation", "uuid", @@ -1848,9 +1849,9 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "408a05dafdc70d05a38a29005b8b15e21b0238734dab1e98483fcb58038c5aba" +checksum = "ebb72c6940697eaaba9bd1f746a697a07819de952b817e3fb841fb75331ad5d4" dependencies = [ "ahash 0.8.12", "arrow", @@ -1869,9 +1870,9 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate-common" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "756d21da2dd6c9bef97af1504970ff56cbf35d03fbd4ffd62827f02f4d2279d4" +checksum = "d7fdc54656659e5ecd49bf341061f4156ab230052611f4f3609612a0da259696" dependencies = [ "ahash 0.8.12", "arrow", @@ -1882,9 +1883,9 @@ dependencies = [ [[package]] name = "datafusion-functions-table" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc9a97220736c8fff1446e936be90d57216c06f28969f9ffd3b72ac93c958c8a" +checksum = "de2fc6c2946da5cab8364fb28b5cac3115f0f3a87960b235ed031c3f7e2e639b" dependencies = [ "arrow", "async-trait", @@ -1898,10 +1899,11 @@ dependencies = [ [[package]] name = "datafusion-functions-window" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cefc2d77646e1aadd1d6a9c40088937aedec04e68c5f0465939912e1291f8193" +checksum = "3e5746548a8544870a119f556543adcd88fe0ba6b93723fe78ad0439e0fbb8b4" dependencies = [ + "arrow", "datafusion-common", "datafusion-doc", "datafusion-expr", @@ -1915,9 +1917,9 @@ dependencies = [ [[package]] name = "datafusion-functions-window-common" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd4aff082c42fa6da99ce0698c85addd5252928c908eb087ca3cfa64ff16b313" +checksum = "dcbe9404382cda257c434f22e13577bee7047031dfdb6216dd5e841b9465e6fe" dependencies = [ "datafusion-common", "datafusion-physical-expr-common", @@ -1925,9 +1927,9 @@ dependencies = [ [[package]] name = "datafusion-macros" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df6f88d7ee27daf8b108ba910f9015176b36fbc72902b1ca5c2a5f1d1717e1a1" +checksum = "8dce50e3b637dab0d25d04d2fe79dfdca2b257eabd76790bffd22c7f90d700c8" dependencies = [ "datafusion-expr", "quote", @@ -1936,9 +1938,9 @@ dependencies = [ [[package]] name = "datafusion-optimizer" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "084d9f979c4b155346d3c34b18f4256e6904ded508e9554d90fed416415c3515" +checksum = "03cfaacf06445dc3bbc1e901242d2a44f2cae99a744f49f3fefddcee46240058" dependencies = [ "arrow", "chrono", @@ -1954,9 +1956,9 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64c536062b0076f4e30084065d805f389f9fe38af0ca75bcbac86bc5e9fbab65" +checksum = "1908034a89d7b2630898e06863583ae4c00a0dd310c1589ca284195ee3f7f8a6" dependencies = [ "ahash 0.8.12", "arrow", @@ -1971,14 +1973,14 @@ dependencies = [ "itertools 0.14.0", "log", "paste", - "petgraph", + "petgraph 0.8.2", ] [[package]] name = "datafusion-physical-expr-common" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8a92b53b3193fac1916a1c5b8e3f4347c526f6822e56b71faa5fb372327a863" +checksum = "47b7a12dd59ea07614b67dbb01d85254fbd93df45bcffa63495e11d3bdf847df" dependencies = [ "ahash 0.8.12", "arrow", @@ -1990,9 +1992,9 @@ dependencies = [ [[package]] name = "datafusion-physical-optimizer" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6fa0a5ac94c7cf3da97bedabd69d6bbca12aef84b9b37e6e9e8c25286511b5e2" +checksum = "4371cc4ad33978cc2a8be93bd54a232d3f2857b50401a14631c0705f3f910aae" dependencies = [ "arrow", "datafusion-common", @@ -2008,9 +2010,9 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "690c615db468c2e5fe5085b232d8b1c088299a6c63d87fd960a354a71f7acb55" +checksum = "dc47bc33025757a5c11f2cd094c5b6b5ed87f46fa33c023e6fdfa25fcbfade23" dependencies = [ "ahash 0.8.12", "arrow", @@ -2038,9 +2040,9 @@ dependencies = [ [[package]] name = "datafusion-session" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad229a134c7406c057ece00c8743c0c34b97f4e72f78b475fe17b66c5e14fa4f" +checksum = "d7485da32283985d6b45bd7d13a65169dcbe8c869e25d01b2cfbc425254b4b49" dependencies = [ "arrow", "async-trait", @@ -2062,9 +2064,9 @@ dependencies = [ [[package]] name = "datafusion-sql" -version = "47.0.0" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64f6ab28b72b664c21a27b22a2ff815fd390ed224c26e89a93b5a8154a4e8607" +checksum = "a466b15632befddfeac68c125f0260f569ff315c6831538cbb40db754134e0df" dependencies = [ "arrow", "bigdecimal", @@ -4132,6 +4134,18 @@ dependencies = [ "indexmap", ] +[[package]] +name = "petgraph" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54acf3a685220b533e437e264e4d932cfbdc4cc7ec0cd232ed73c08d03b8a7ca" +dependencies = [ + "fixedbitset", + "hashbrown 0.15.4", + "indexmap", + "serde", +] + [[package]] name = "phf" version = "0.11.3" @@ -4317,7 +4331,7 @@ dependencies = [ "log", "multimap", "once_cell", - "petgraph", + "petgraph 0.7.1", "prettyplease", "prost", "prost-types", diff --git a/Cargo.toml b/Cargo.toml index 2e52ffd5b28..def02976d33 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,16 +58,16 @@ anyhow = "1.0.95" arbitrary = "1.3.2" arcref = "0.2.0" arrayref = "0.3.7" -arrow = { version = "55", default-features = false } -arrow-arith = "55" -arrow-array = "55" -arrow-buffer = "55" -arrow-cast = "55" -arrow-data = "55" -arrow-ord = "55" -arrow-schema = "55" -arrow-select = "55" -arrow-string = "55" +arrow = { version = "55.1", default-features = false } +arrow-arith = "55.1" +arrow-array = "55.1" +arrow-buffer = "55.1" +arrow-cast = "55.1" +arrow-data = "55.1" +arrow-ord = "55.1" +arrow-schema = "55.1" +arrow-select = "55.1" +arrow-string = "55.1" async-stream = "0.3.6" async-trait = "0.1.88" bindgen = "0.71.1" @@ -78,15 +78,15 @@ bzip2 = "0.5.0" cbindgen = "0.29.0" cc = "1.2" cfg-if = "1" -chrono = "0.4.40" +chrono = "0.4.41" clap = "4.5" compio = { version = "0.14", features = ["io-uring"], default-features = false } crossbeam-queue = "0.3" crossterm = "0.28" dashmap = "6.1.0" -datafusion = { version = "47", default-features = false } -datafusion-common = { version = "47" } -datafusion-physical-plan = { version = "47" } +datafusion = { version = "48", default-features = false } +datafusion-common = { version = "48" } +datafusion-physical-plan = { version = "48" } divan = { package = "codspeed-divan-compat", version = "2.8.0" } duckdb = { path = "duckdb-vortex/duckdb-rs/crates/duckdb", features = [ "vtab-full", @@ -102,7 +102,7 @@ futures = { version = "0.3.31", default-features = false } futures-util = "0.3.31" glob = "0.3.2" goldenfile = "1" -half = { version = "2.5", features = ["std", "num-traits"] } +half = { version = "2.6", features = ["std", "num-traits"] } hashbrown = "0.15.1" homedir = "0.3.3" humansize = "2.1.3" @@ -124,7 +124,7 @@ once_cell = "1.21" opentelemetry = "0.29.0" opentelemetry-otlp = "0.29.0" opentelemetry_sdk = "0.29.0" -parquet = "55" +parquet = "55.1" paste = "1.0.15" pco = "0.4.4" pin-project = "1.1.5" @@ -163,14 +163,14 @@ taffy = "0.8.0" tar = "0.4" tempfile = "3" thiserror = "2.0.3" -tokio = "1.44.2" +tokio = "1.45.1" tokio-stream = "0.1.17" tracing = { version = "0.1.41" } tracing-chrome = "0.7.2" tracing-futures = "0.2.5" tracing-subscriber = "0.3.19" url = "2.5.4" -uuid = { version = "1.16", features = ["js"] } +uuid = { version = "1.17", features = ["js"] } walkdir = "2.5.0" wasm-bindgen-futures = "0.4.39" witchcraft-metrics = "1.0.1" diff --git a/bench-vortex/src/clickbench.rs b/bench-vortex/src/clickbench.rs index 6842d45afdc..d2edc4a7849 100644 --- a/bench-vortex/src/clickbench.rs +++ b/bench-vortex/src/clickbench.rs @@ -216,8 +216,9 @@ pub async fn register_vortex_files( let table_url = ListingTableUrl::parse(vortex_path)?; - let config = - ListingTableConfig::new(table_url).with_listing_options(ListingOptions::new(format)); + let config = ListingTableConfig::new(table_url).with_listing_options( + ListingOptions::new(format).with_session_config_options(session.state().config()), + ); let config = if let Some(schema) = schema { config.with_schema(schema.into()) @@ -248,7 +249,9 @@ pub fn register_parquet_files( let table_url = ListingTableUrl::parse(table_path)?; let config = ListingTableConfig::new(table_url) - .with_listing_options(ListingOptions::new(format)) + .with_listing_options( + ListingOptions::new(format).with_session_config_options(session.state().config()), + ) .with_schema(schema.clone().into()); let listing_table = Arc::new(ListingTable::try_new(config)?); diff --git a/bench-vortex/src/datasets/file.rs b/bench-vortex/src/datasets/file.rs index 0442eec8307..bd57429b2d1 100644 --- a/bench-vortex/src/datasets/file.rs +++ b/bench-vortex/src/datasets/file.rs @@ -66,8 +66,9 @@ pub async fn register_parquet_files( info!("Registering table from {}", &parquet_path); let table_url = ListingTableUrl::parse(parquet_path)?; - let config = ListingTableConfig::new(table_url) - .with_listing_options(ListingOptions::new(format)); + let config = ListingTableConfig::new(table_url).with_listing_options( + ListingOptions::new(format).with_session_config_options(session.state().config()), + ); let config = if let Some(schema) = schema { config.with_schema(schema.into()) @@ -97,8 +98,9 @@ pub async fn register_vortex_files( // Register the Vortex file let format = Arc::new(VortexFormat::default()); let table_url = ListingTableUrl::parse(file_url.as_str())?; - let config = ListingTableConfig::new(table_url) - .with_listing_options(ListingOptions::new(format)); + let config = ListingTableConfig::new(table_url).with_listing_options( + ListingOptions::new(format).with_session_config_options(session.state().config()), + ); let config = if let Some(schema) = schema { config.with_schema(schema.into()) diff --git a/bench-vortex/src/public_bi.rs b/bench-vortex/src/public_bi.rs index 48dfc85b641..b77e3e8ed31 100644 --- a/bench-vortex/src/public_bi.rs +++ b/bench-vortex/src/public_bi.rs @@ -393,7 +393,10 @@ impl PBIData { let path = self.get_file_path(&table.name, file_type); let table_url = ListingTableUrl::parse(path.to_str().expect("unicode"))?; let config = ListingTableConfig::new(table_url) - .with_listing_options(ListingOptions::new(df_format)) + .with_listing_options( + ListingOptions::new(df_format) + .with_session_config_options(session.state().config()), + ) .with_schema(schema.into()); let listing_table = Arc::new(ListingTable::try_new(config)?); diff --git a/vortex-datafusion/examples/vortex_table.rs b/vortex-datafusion/examples/vortex_table.rs index ce5840ea139..f43204da2bf 100644 --- a/vortex-datafusion/examples/vortex_table.rs +++ b/vortex-datafusion/examples/vortex_table.rs @@ -57,7 +57,9 @@ async fn main() -> anyhow::Result<()> { .ok_or_else(|| vortex_err!("Path is not valid UTF-8"))?, )?; let config = ListingTableConfig::new(table_url) - .with_listing_options(ListingOptions::new(format)) + .with_listing_options( + ListingOptions::new(format).with_session_config_options(ctx.state().config()), + ) .infer_schema(&ctx.state()) .await?; diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index f6db570376e..4a4f94cf164 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -4,7 +4,9 @@ use std::fmt::Debug; use datafusion::arrow::datatypes::{DataType, Schema}; use datafusion::common::stats::Precision as DFPrecision; -use datafusion::logical_expr::{Expr, Operator}; +use datafusion::logical_expr::Operator; +use datafusion::physical_expr::PhysicalExprRef; +use datafusion::physical_plan::expressions::{BinaryExpr, Column, LikeExpr, Literal}; use vortex::stats::Precision; mod convert; @@ -47,26 +49,24 @@ fn supported_data_types(dt: DataType) -> bool { is_supported } -fn can_be_pushed_down(expr: &Expr, schema: &Schema) -> bool { - match expr { - Expr::BinaryExpr(expr) - if expr.op.is_logic_operator() || SUPPORTED_BINARY_OPS.contains(&expr.op) => - { - can_be_pushed_down(expr.left.as_ref(), schema) - & can_be_pushed_down(expr.right.as_ref(), schema) - } - Expr::Column(col) => match schema.column_with_name(col.name()) { - Some((_, field)) => supported_data_types(field.data_type().clone()), - _ => false, - }, - Expr::Like(like) => { - can_be_pushed_down(&like.expr, schema) && can_be_pushed_down(&like.pattern, schema) - } - Expr::Literal(lit) => supported_data_types(lit.data_type()), - _ => { - log::debug!("DataFusion expression can't be pushed down: {expr:?}"); - false - } +fn can_be_pushed_down(expr: &PhysicalExprRef, schema: &Schema) -> bool { + let expr = expr.as_any(); + if let Some(binary) = expr.downcast_ref::() { + (binary.op().is_logic_operator() || SUPPORTED_BINARY_OPS.contains(binary.op())) + && can_be_pushed_down(binary.left(), schema) + && can_be_pushed_down(binary.right(), schema) + } else if let Some(col) = expr.downcast_ref::() { + schema + .column_with_name(col.name()) + .map(|(_, field)| supported_data_types(field.data_type().clone())) + .unwrap_or(false) + } else if let Some(like) = expr.downcast_ref::() { + can_be_pushed_down(like.expr(), schema) && can_be_pushed_down(like.pattern(), schema) + } else if let Some(lit) = expr.downcast_ref::() { + supported_data_types(lit.value().data_type()) + } else { + log::debug!("DataFusion expression can't be pushed down: {expr:?}"); + false } } diff --git a/vortex-datafusion/src/persistent/format.rs b/vortex-datafusion/src/persistent/format.rs index 72dc4465c7b..d8ab52e1de8 100644 --- a/vortex-datafusion/src/persistent/format.rs +++ b/vortex-datafusion/src/persistent/format.rs @@ -12,15 +12,14 @@ use datafusion::common::{ config_datafusion_err, not_impl_err, }; use datafusion::datasource::file_format::file_compression_type::FileCompressionType; -use datafusion::datasource::file_format::{FileFormat, FileFormatFactory, FilePushdownSupport}; +use datafusion::datasource::file_format::{FileFormat, FileFormatFactory}; use datafusion::datasource::physical_plan::{ FileScanConfig, FileScanConfigBuilder, FileSinkConfig, FileSource, }; use datafusion::datasource::sink::DataSinkExec; use datafusion::datasource::source::DataSourceExec; -use datafusion::logical_expr::Expr; use datafusion::logical_expr::dml::InsertOp; -use datafusion::physical_expr::{LexRequirement, PhysicalExpr}; +use datafusion::physical_expr::LexRequirement; use datafusion::physical_plan::ExecutionPlan; use futures::{FutureExt, StreamExt as _, TryStreamExt as _, stream}; use itertools::Itertools; @@ -28,7 +27,6 @@ use object_store::{ObjectMeta, ObjectStore}; use vortex::dtype::DType; use vortex::dtype::arrow::FromArrowType; use vortex::error::{VortexExpect, VortexResult, vortex_err}; -use vortex::expr::{ExprRef, VortexExpr, and}; use vortex::file::VORTEX_FILE_EXTENSION; use vortex::metrics::VortexMetrics; use vortex::session::VortexSession; @@ -38,8 +36,8 @@ use vortex::stats::{Stat, StatsProviderExt, StatsSet}; use super::cache::VortexFileCache; use super::sink::VortexSink; use super::source::VortexSource; -use crate::convert::{TryFromDataFusion, TryToDataFusion}; -use crate::{PrecisionExt as _, can_be_pushed_down}; +use crate::PrecisionExt as _; +use crate::convert::TryToDataFusion; /// Vortex implementation of a DataFusion [`FileFormat`]. pub struct VortexFormat { @@ -298,7 +296,6 @@ impl FileFormat for VortexFormat { &self, _state: &dyn Session, file_scan_config: FileScanConfig, - filters: Option<&Arc>, ) -> DFResult> { if file_scan_config .file_groups @@ -317,11 +314,7 @@ impl FileFormat for VortexFormat { return not_impl_err!("Vortex doesn't support output ordering"); } - let mut source = VortexSource::new(self.file_cache.clone(), self.session.metrics().clone()); - if let Some(predicate) = make_vortex_predicate(filters) { - source = source.with_predicate(predicate); - } - + let source = VortexSource::new(self.file_cache.clone(), self.session.metrics().clone()); Ok(DataSourceExec::from_data_source( FileScanConfigBuilder::from(file_scan_config) .with_source(Arc::new(source)) @@ -350,23 +343,6 @@ impl FileFormat for VortexFormat { Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _) } - fn supports_filters_pushdown( - &self, - _file_schema: &Schema, - table_schema: &Schema, - filters: &[&Expr], - ) -> DFResult { - let is_pushdown = filters - .iter() - .all(|expr| can_be_pushed_down(expr, table_schema)); - - if is_pushdown { - Ok(FilePushdownSupport::Supported) - } else { - Ok(FilePushdownSupport::NotSupportedForFilter) - } - } - fn file_source(&self) -> Arc { Arc::new(VortexSource::new( self.file_cache.clone(), @@ -375,22 +351,6 @@ impl FileFormat for VortexFormat { } } -pub(crate) fn make_vortex_predicate( - predicate: Option<&Arc>, -) -> Option> { - predicate - // If we cannot convert an expr to a vortex expr, we run no filter, since datafusion - // will rerun the filter expression anyway. - .and_then(|expr| { - // This splits expressions into conjunctions and converts them to vortex expressions. - // Any inconvertible expressions are dropped since true /\ a == a. - datafusion::physical_expr::split_conjunction(expr) - .into_iter() - .filter_map(|e| ExprRef::try_from_df(e.as_ref()).ok()) - .reduce(and) - }) -} - #[cfg(test)] mod tests { use datafusion::execution::SessionStateBuilder; diff --git a/vortex-datafusion/src/persistent/mod.rs b/vortex-datafusion/src/persistent/mod.rs index 78f9f3e025b..f739255763b 100644 --- a/vortex-datafusion/src/persistent/mod.rs +++ b/vortex-datafusion/src/persistent/mod.rs @@ -96,7 +96,9 @@ mod tests { assert!(table_url.is_collection()); let config = ListingTableConfig::new(table_url) - .with_listing_options(ListingOptions::new(format)) + .with_listing_options( + ListingOptions::new(format).with_session_config_options(ctx.state().config()), + ) .infer_schema(&ctx.state()) .await?; diff --git a/vortex-datafusion/src/persistent/sink.rs b/vortex-datafusion/src/persistent/sink.rs index 108c247108c..18109837b71 100644 --- a/vortex-datafusion/src/persistent/sink.rs +++ b/vortex-datafusion/src/persistent/sink.rs @@ -127,6 +127,7 @@ mod tests { use datafusion::execution::SessionStateBuilder; use datafusion::logical_expr::{Expr, LogicalPlan, LogicalPlanBuilder, Values}; use datafusion::prelude::SessionContext; + use datafusion::scalar::ScalarValue; use tempfile::TempDir; use crate::persistent::{VortexFormatFactory, register_vortex_format_factory}; @@ -157,8 +158,8 @@ mod tests { let values = Values { schema: Arc::new(my_tbl.schema().clone()), values: vec![vec![ - Expr::Literal("hello".into()), - Expr::Literal(42_i32.into()), + Expr::Literal(ScalarValue::new_utf8view("hello"), None), + Expr::Literal(42_i32.into(), None), ]], }; diff --git a/vortex-datafusion/src/persistent/source.rs b/vortex-datafusion/src/persistent/source.rs index 66dc0a74188..8ed8f061244 100644 --- a/vortex-datafusion/src/persistent/source.rs +++ b/vortex-datafusion/src/persistent/source.rs @@ -4,12 +4,17 @@ use std::sync::{Arc, Weak}; use dashmap::DashMap; use datafusion::arrow::datatypes::SchemaRef; use datafusion::common::{Result as DFResult, Statistics}; +use datafusion::config::ConfigOptions; use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileSource}; +use datafusion::physical_plan::PhysicalExpr; +use datafusion::physical_plan::filter_pushdown::{ + FilterPushdownPropagation, PredicateSupport, PredicateSupports, +}; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use object_store::ObjectStore; use object_store::path::Path; use vortex::error::VortexExpect as _; -use vortex::expr::{VortexExpr, root}; +use vortex::expr::{ExprRef, VortexExpr, and, root}; use vortex::file::VORTEX_FILE_EXTENSION; use vortex::layout::LayoutReader; use vortex::metrics::VortexMetrics; @@ -18,6 +23,8 @@ use super::cache::VortexFileCache; use super::config::{ConfigProjection, FileScanConfigExt}; use super::metrics::PARTITION_LABEL; use super::opener::VortexFileOpener; +use crate::can_be_pushed_down; +use crate::convert::TryFromDataFusion as _; /// A config for [`VortexFileOpener`]. Used to create [`DataSourceExec`] based physical plans. /// @@ -158,4 +165,49 @@ impl FileSource for VortexSource { fn file_type(&self) -> &str { VORTEX_FILE_EXTENSION } + + fn try_pushdown_filters( + &self, + filters: Vec>, + _config: &ConfigOptions, + ) -> DFResult>> { + let Some(schema) = self.arrow_schema.as_ref() else { + return Ok(FilterPushdownPropagation::unsupported(filters)); + }; + let (supported, unsupported): (Vec<_>, Vec<_>) = filters + .iter() + .partition(|expr| can_be_pushed_down(expr, schema)); + + match make_vortex_predicate(&supported) { + Some(predicate) => { + let supports = PredicateSupports::new( + supported + .into_iter() + .map(|expr| PredicateSupport::Supported(expr.clone())) + .chain( + unsupported + .into_iter() + .map(|expr| PredicateSupport::Unsupported(expr.clone())), + ) + .collect(), + ); + Ok(FilterPushdownPropagation::with_filters(supports) + .with_updated_node(Arc::new(self.with_predicate(predicate)))) + } + None => Ok(FilterPushdownPropagation::unsupported(filters)), + } + } +} + +// If we cannot convert an expr to a vortex expr, we run no filter, since datafusion +// will rerun the filter expression anyway. +pub(crate) fn make_vortex_predicate( + predicate: &[&Arc], +) -> Option> { + // This splits expressions into conjunctions and converts them to vortex expressions. + // Any inconvertible expressions are dropped since true /\ a == a. + predicate + .iter() + .filter_map(|e| ExprRef::try_from_df(e.as_ref()).ok()) + .reduce(and) }