From 4629f021bd92b9b73db1ddb7ec6f923c97d5b9bd Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Wed, 19 Jun 2024 13:51:40 -0400 Subject: [PATCH 1/3] Add distinct_on to dataframe api #11011 --- datafusion/core/src/dataframe/mod.rs | 116 +++++++++++++++++++++++++++ docs/source/user-guide/dataframe.md | 1 + 2 files changed, 117 insertions(+) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index b5c58eff577c..9b262570d72c 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -486,6 +486,37 @@ impl DataFrame { }) } + /// Return a new `DataFrame` with duplicated rows removed as per the specified expression list + /// according to the provided sorting expressions grouped by the `DISTINCT ON` clause + /// expressions. + /// + /// # Example + /// ``` + /// # use datafusion::prelude::*; + /// # use datafusion::error::Result; + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// let ctx = SessionContext::new(); + /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; + /// let df = df.distinct_on(vec![col("a")], vec![col("a"), col("b")], None)?; + /// # Ok(()) + /// # } + /// ``` + pub fn distinct_on( + self, + on_expr: Vec, + select_expr: Vec, + sort_expr: Option>, + ) -> Result { + let plan = LogicalPlanBuilder::from(self.plan) + .distinct_on(on_expr, select_expr, sort_expr)? + .build()?; + Ok(DataFrame { + session_state: self.session_state, + plan, + }) + } + /// Return a new `DataFrame` that has statistics for a DataFrame. /// /// Only summarizes numeric datatypes at the moment and returns nulls for @@ -2190,6 +2221,91 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_distinct_on() -> Result<()> { + let t = test_table().await?; + let plan = t + .distinct_on(vec![col("c1")], vec![col("aggregate_test_100.c1")], None) + .unwrap(); + + let sql_plan = + create_plan("select distinct on (c1) c1 from aggregate_test_100").await?; + + assert_same_plan(&plan.plan.clone(), &sql_plan); + + let df_results = plan.clone().collect().await?; + + #[rustfmt::skip] + assert_batches_sorted_eq!( + ["+----+", + "| c1 |", + "+----+", + "| a |", + "| b |", + "| c |", + "| d |", + "| e |", + "+----+"], + &df_results + ); + + Ok(()) + } + + #[tokio::test] + async fn test_distinct_on_sort_by() -> Result<()> { + let t = test_table().await?; + let plan = t + .select(vec![col("c1")]) + .unwrap() + .distinct_on( + vec![col("c1")], + vec![col("c1")], + Some(vec![col("c1").sort(true, true)]), + ) + .unwrap() + .sort(vec![col("c1").sort(true, true)]) + .unwrap(); + + let df_results = plan.clone().collect().await?; + + #[rustfmt::skip] + assert_batches_sorted_eq!( + ["+----+", + "| c1 |", + "+----+", + "| a |", + "| b |", + "| c |", + "| d |", + "| e |", + "+----+"], + &df_results + ); + + Ok(()) + } + + #[tokio::test] + async fn test_distinct_on_sort_by_unprojected() -> Result<()> { + let t = test_table().await?; + let err = t + .select(vec![col("c1")]) + .unwrap() + .distinct_on( + vec![col("c1")], + vec![col("c1")], + Some(vec![col("c1").sort(true, true)]), + ) + .unwrap() + // try to sort on some value not present in input to distinct + .sort(vec![col("c2").sort(true, true)]) + .unwrap_err(); + assert_eq!(err.strip_backtrace(), "Error during planning: For SELECT DISTINCT, ORDER BY expressions c2 must appear in select list"); + + Ok(()) + } + #[tokio::test] async fn join() -> Result<()> { let left = test_table().await?.select_columns(&["c1", "c2"])?; diff --git a/docs/source/user-guide/dataframe.md b/docs/source/user-guide/dataframe.md index c0210200a246..d768db48f76e 100644 --- a/docs/source/user-guide/dataframe.md +++ b/docs/source/user-guide/dataframe.md @@ -64,6 +64,7 @@ execution. The plan is evaluated (executed) when an action method is invoked, su | ------------------- | ------------------------------------------------------------------------------------------------------------------------------------------ | | aggregate | Perform an aggregate query with optional grouping expressions. | | distinct | Filter out duplicate rows. | +| distinct_on | Filter out duplicate rows based on provided expressions. | | except | Calculate the exception of two DataFrames. The two DataFrames must have exactly the same schema | | filter | Filter a DataFrame to only include rows that match the specified filter expression. | | intersect | Calculate the intersection of two DataFrames. The two DataFrames must have exactly the same schema | From 792bc02c2d581cf634de476e5b1de29b1bf7b197 Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Wed, 19 Jun 2024 15:46:07 -0400 Subject: [PATCH 2/3] cargo fmt --- datafusion/core/src/dataframe/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 9b262570d72c..d0915785bbb8 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -2227,7 +2227,7 @@ mod tests { let plan = t .distinct_on(vec![col("c1")], vec![col("aggregate_test_100.c1")], None) .unwrap(); - + let sql_plan = create_plan("select distinct on (c1) c1 from aggregate_test_100").await?; From 7219fbd988d2f39158601f934ef7d8644f846bab Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Thu, 20 Jun 2024 13:23:02 -0400 Subject: [PATCH 3/3] Update datafusion/core/src/dataframe/mod.rs as per reviewer feedback Co-authored-by: Andrew Lamb --- datafusion/core/src/dataframe/mod.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index d0915785bbb8..fae961a070f3 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -497,8 +497,9 @@ impl DataFrame { /// # #[tokio::main] /// # async fn main() -> Result<()> { /// let ctx = SessionContext::new(); - /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; - /// let df = df.distinct_on(vec![col("a")], vec![col("a"), col("b")], None)?; + /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await? + /// // Return a single row (a, b) for each distinct value of a + /// .distinct_on(vec![col("a")], vec![col("a"), col("b")], None)?; /// # Ok(()) /// # } /// ```