Skip to content

datafusion 48 #3560

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 76 additions & 62 deletions Cargo.lock

Large diffs are not rendered by default.

36 changes: 18 additions & 18 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,16 @@ anyhow = "1.0.95"
arbitrary = "1.3.2"
arcref = "0.2.0"
arrayref = "0.3.7"
arrow = { version = "55", default-features = false }
arrow-arith = "55"
arrow-array = "55"
arrow-buffer = "55"
arrow-cast = "55"
arrow-data = "55"
arrow-ord = "55"
arrow-schema = "55"
arrow-select = "55"
arrow-string = "55"
arrow = { version = "55.1", default-features = false }
arrow-arith = "55.1"
arrow-array = "55.1"
arrow-buffer = "55.1"
arrow-cast = "55.1"
arrow-data = "55.1"
arrow-ord = "55.1"
arrow-schema = "55.1"
arrow-select = "55.1"
arrow-string = "55.1"
async-stream = "0.3.6"
async-trait = "0.1.88"
bindgen = "0.71.1"
Expand All @@ -78,15 +78,15 @@ bzip2 = "0.5.0"
cbindgen = "0.29.0"
cc = "1.2"
cfg-if = "1"
chrono = "0.4.40"
chrono = "0.4.41"
clap = "4.5"
compio = { version = "0.14", features = ["io-uring"], default-features = false }
crossbeam-queue = "0.3"
crossterm = "0.28"
dashmap = "6.1.0"
datafusion = { version = "47", default-features = false }
datafusion-common = { version = "47" }
datafusion-physical-plan = { version = "47" }
datafusion = { version = "48", default-features = false }
datafusion-common = { version = "48" }
datafusion-physical-plan = { version = "48" }
divan = { package = "codspeed-divan-compat", version = "2.8.0" }
duckdb = { path = "duckdb-vortex/duckdb-rs/crates/duckdb", features = [
"vtab-full",
Expand All @@ -102,7 +102,7 @@ futures = { version = "0.3.31", default-features = false }
futures-util = "0.3.31"
glob = "0.3.2"
goldenfile = "1"
half = { version = "2.5", features = ["std", "num-traits"] }
half = { version = "2.6", features = ["std", "num-traits"] }
hashbrown = "0.15.1"
homedir = "0.3.3"
humansize = "2.1.3"
Expand All @@ -124,7 +124,7 @@ once_cell = "1.21"
opentelemetry = "0.29.0"
opentelemetry-otlp = "0.29.0"
opentelemetry_sdk = "0.29.0"
parquet = "55"
parquet = "55.1"
paste = "1.0.15"
pco = "0.4.4"
pin-project = "1.1.5"
Expand Down Expand Up @@ -163,14 +163,14 @@ taffy = "0.8.0"
tar = "0.4"
tempfile = "3"
thiserror = "2.0.3"
tokio = "1.44.2"
tokio = "1.45.1"
tokio-stream = "0.1.17"
tracing = { version = "0.1.41" }
tracing-chrome = "0.7.2"
tracing-futures = "0.2.5"
tracing-subscriber = "0.3.19"
url = "2.5.4"
uuid = { version = "1.16", features = ["js"] }
uuid = { version = "1.17", features = ["js"] }
walkdir = "2.5.0"
wasm-bindgen-futures = "0.4.39"
witchcraft-metrics = "1.0.1"
Expand Down
9 changes: 6 additions & 3 deletions bench-vortex/src/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,9 @@ pub async fn register_vortex_files(

let table_url = ListingTableUrl::parse(vortex_path)?;

let config =
ListingTableConfig::new(table_url).with_listing_options(ListingOptions::new(format));
let config = ListingTableConfig::new(table_url).with_listing_options(
ListingOptions::new(format).with_session_config_options(session.state().config()),
);

let config = if let Some(schema) = schema {
config.with_schema(schema.into())
Expand Down Expand Up @@ -248,7 +249,9 @@ pub fn register_parquet_files(
let table_url = ListingTableUrl::parse(table_path)?;

let config = ListingTableConfig::new(table_url)
.with_listing_options(ListingOptions::new(format))
.with_listing_options(
ListingOptions::new(format).with_session_config_options(session.state().config()),
)
.with_schema(schema.clone().into());

let listing_table = Arc::new(ListingTable::try_new(config)?);
Expand Down
10 changes: 6 additions & 4 deletions bench-vortex/src/datasets/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ pub async fn register_parquet_files(
info!("Registering table from {}", &parquet_path);
let table_url = ListingTableUrl::parse(parquet_path)?;

let config = ListingTableConfig::new(table_url)
.with_listing_options(ListingOptions::new(format));
let config = ListingTableConfig::new(table_url).with_listing_options(
ListingOptions::new(format).with_session_config_options(session.state().config()),
);

let config = if let Some(schema) = schema {
config.with_schema(schema.into())
Expand Down Expand Up @@ -97,8 +98,9 @@ pub async fn register_vortex_files(
// Register the Vortex file
let format = Arc::new(VortexFormat::default());
let table_url = ListingTableUrl::parse(file_url.as_str())?;
let config = ListingTableConfig::new(table_url)
.with_listing_options(ListingOptions::new(format));
let config = ListingTableConfig::new(table_url).with_listing_options(
ListingOptions::new(format).with_session_config_options(session.state().config()),
);

let config = if let Some(schema) = schema {
config.with_schema(schema.into())
Expand Down
5 changes: 4 additions & 1 deletion bench-vortex/src/public_bi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,10 @@ impl PBIData {
let path = self.get_file_path(&table.name, file_type);
let table_url = ListingTableUrl::parse(path.to_str().expect("unicode"))?;
let config = ListingTableConfig::new(table_url)
.with_listing_options(ListingOptions::new(df_format))
.with_listing_options(
ListingOptions::new(df_format)
.with_session_config_options(session.state().config()),
)
.with_schema(schema.into());

let listing_table = Arc::new(ListingTable::try_new(config)?);
Expand Down
4 changes: 3 additions & 1 deletion vortex-datafusion/examples/vortex_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ async fn main() -> anyhow::Result<()> {
.ok_or_else(|| vortex_err!("Path is not valid UTF-8"))?,
)?;
let config = ListingTableConfig::new(table_url)
.with_listing_options(ListingOptions::new(format))
.with_listing_options(
ListingOptions::new(format).with_session_config_options(ctx.state().config()),
)
.infer_schema(&ctx.state())
.await?;

Expand Down
42 changes: 21 additions & 21 deletions vortex-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use std::fmt::Debug;

use datafusion::arrow::datatypes::{DataType, Schema};
use datafusion::common::stats::Precision as DFPrecision;
use datafusion::logical_expr::{Expr, Operator};
use datafusion::logical_expr::Operator;
use datafusion::physical_expr::PhysicalExprRef;
use datafusion::physical_plan::expressions::{BinaryExpr, Column, LikeExpr, Literal};
use vortex::stats::Precision;

mod convert;
Expand Down Expand Up @@ -47,26 +49,24 @@ fn supported_data_types(dt: DataType) -> bool {
is_supported
}

fn can_be_pushed_down(expr: &Expr, schema: &Schema) -> bool {
match expr {
Expr::BinaryExpr(expr)
if expr.op.is_logic_operator() || SUPPORTED_BINARY_OPS.contains(&expr.op) =>
{
can_be_pushed_down(expr.left.as_ref(), schema)
& can_be_pushed_down(expr.right.as_ref(), schema)
}
Expr::Column(col) => match schema.column_with_name(col.name()) {
Some((_, field)) => supported_data_types(field.data_type().clone()),
_ => false,
},
Expr::Like(like) => {
can_be_pushed_down(&like.expr, schema) && can_be_pushed_down(&like.pattern, schema)
}
Expr::Literal(lit) => supported_data_types(lit.data_type()),
_ => {
log::debug!("DataFusion expression can't be pushed down: {expr:?}");
false
}
fn can_be_pushed_down(expr: &PhysicalExprRef, schema: &Schema) -> bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was just thinking last night due we support new things here? We have In now too right? (@joseph-isaacs?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But its a pretty slow rn, what type would it run over?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whatever is available I guess? if its not a perf win right now we can add that later

let expr = expr.as_any();
if let Some(binary) = expr.downcast_ref::<BinaryExpr>() {
(binary.op().is_logic_operator() || SUPPORTED_BINARY_OPS.contains(binary.op()))
&& can_be_pushed_down(binary.left(), schema)
&& can_be_pushed_down(binary.right(), schema)
} else if let Some(col) = expr.downcast_ref::<Column>() {
schema
.column_with_name(col.name())
.map(|(_, field)| supported_data_types(field.data_type().clone()))
.unwrap_or(false)
} else if let Some(like) = expr.downcast_ref::<LikeExpr>() {
can_be_pushed_down(like.expr(), schema) && can_be_pushed_down(like.pattern(), schema)
} else if let Some(lit) = expr.downcast_ref::<Literal>() {
supported_data_types(lit.value().data_type())
} else {
log::debug!("DataFusion expression can't be pushed down: {expr:?}");
false
}
}

Expand Down
50 changes: 5 additions & 45 deletions vortex-datafusion/src/persistent/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,21 @@ use datafusion::common::{
config_datafusion_err, not_impl_err,
};
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
use datafusion::datasource::file_format::{FileFormat, FileFormatFactory, FilePushdownSupport};
use datafusion::datasource::file_format::{FileFormat, FileFormatFactory};
use datafusion::datasource::physical_plan::{
FileScanConfig, FileScanConfigBuilder, FileSinkConfig, FileSource,
};
use datafusion::datasource::sink::DataSinkExec;
use datafusion::datasource::source::DataSourceExec;
use datafusion::logical_expr::Expr;
use datafusion::logical_expr::dml::InsertOp;
use datafusion::physical_expr::{LexRequirement, PhysicalExpr};
use datafusion::physical_expr::LexRequirement;
use datafusion::physical_plan::ExecutionPlan;
use futures::{FutureExt, StreamExt as _, TryStreamExt as _, stream};
use itertools::Itertools;
use object_store::{ObjectMeta, ObjectStore};
use vortex::dtype::DType;
use vortex::dtype::arrow::FromArrowType;
use vortex::error::{VortexExpect, VortexResult, vortex_err};
use vortex::expr::{ExprRef, VortexExpr, and};
use vortex::file::VORTEX_FILE_EXTENSION;
use vortex::metrics::VortexMetrics;
use vortex::session::VortexSession;
Expand All @@ -38,8 +36,8 @@ use vortex::stats::{Stat, StatsProviderExt, StatsSet};
use super::cache::VortexFileCache;
use super::sink::VortexSink;
use super::source::VortexSource;
use crate::convert::{TryFromDataFusion, TryToDataFusion};
use crate::{PrecisionExt as _, can_be_pushed_down};
use crate::PrecisionExt as _;
use crate::convert::TryToDataFusion;

/// Vortex implementation of a DataFusion [`FileFormat`].
pub struct VortexFormat {
Expand Down Expand Up @@ -298,7 +296,6 @@ impl FileFormat for VortexFormat {
&self,
_state: &dyn Session,
file_scan_config: FileScanConfig,
filters: Option<&Arc<dyn PhysicalExpr>>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
if file_scan_config
.file_groups
Expand All @@ -317,11 +314,7 @@ impl FileFormat for VortexFormat {
return not_impl_err!("Vortex doesn't support output ordering");
}

let mut source = VortexSource::new(self.file_cache.clone(), self.session.metrics().clone());
if let Some(predicate) = make_vortex_predicate(filters) {
source = source.with_predicate(predicate);
}

let source = VortexSource::new(self.file_cache.clone(), self.session.metrics().clone());
Ok(DataSourceExec::from_data_source(
FileScanConfigBuilder::from(file_scan_config)
.with_source(Arc::new(source))
Expand Down Expand Up @@ -350,23 +343,6 @@ impl FileFormat for VortexFormat {
Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
}

fn supports_filters_pushdown(
&self,
_file_schema: &Schema,
table_schema: &Schema,
filters: &[&Expr],
) -> DFResult<FilePushdownSupport> {
let is_pushdown = filters
.iter()
.all(|expr| can_be_pushed_down(expr, table_schema));

if is_pushdown {
Ok(FilePushdownSupport::Supported)
} else {
Ok(FilePushdownSupport::NotSupportedForFilter)
}
}

fn file_source(&self) -> Arc<dyn FileSource> {
Arc::new(VortexSource::new(
self.file_cache.clone(),
Expand All @@ -375,22 +351,6 @@ impl FileFormat for VortexFormat {
}
}

pub(crate) fn make_vortex_predicate(
predicate: Option<&Arc<dyn PhysicalExpr>>,
) -> Option<Arc<dyn VortexExpr>> {
predicate
// If we cannot convert an expr to a vortex expr, we run no filter, since datafusion
// will rerun the filter expression anyway.
.and_then(|expr| {
// This splits expressions into conjunctions and converts them to vortex expressions.
// Any inconvertible expressions are dropped since true /\ a == a.
datafusion::physical_expr::split_conjunction(expr)
.into_iter()
.filter_map(|e| ExprRef::try_from_df(e.as_ref()).ok())
.reduce(and)
})
}

#[cfg(test)]
mod tests {
use datafusion::execution::SessionStateBuilder;
Expand Down
4 changes: 3 additions & 1 deletion vortex-datafusion/src/persistent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ mod tests {
assert!(table_url.is_collection());

let config = ListingTableConfig::new(table_url)
.with_listing_options(ListingOptions::new(format))
.with_listing_options(
ListingOptions::new(format).with_session_config_options(ctx.state().config()),
)
.infer_schema(&ctx.state())
.await?;

Expand Down
5 changes: 3 additions & 2 deletions vortex-datafusion/src/persistent/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ mod tests {
use datafusion::execution::SessionStateBuilder;
use datafusion::logical_expr::{Expr, LogicalPlan, LogicalPlanBuilder, Values};
use datafusion::prelude::SessionContext;
use datafusion::scalar::ScalarValue;
use tempfile::TempDir;

use crate::persistent::{VortexFormatFactory, register_vortex_format_factory};
Expand Down Expand Up @@ -157,8 +158,8 @@ mod tests {
let values = Values {
schema: Arc::new(my_tbl.schema().clone()),
values: vec![vec![
Expr::Literal("hello".into()),
Expr::Literal(42_i32.into()),
Expr::Literal(ScalarValue::new_utf8view("hello"), None),
Expr::Literal(42_i32.into(), None),
]],
};

Expand Down
Loading
Loading