Skip to content

Fix: Preserve sorting for the COPY TO plan #16785

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 17, 2025

Conversation

bert-beyondloops
Copy link
Contributor

@bert-beyondloops bert-beyondloops commented Jul 15, 2025

Which issue does this PR close?

Rationale for this change

The INSERT INTO already takes account of the ordering requirements during planning
The COPY TO not yet.
It seems it was not (yet) implemented?

This sometimes results in order requirements (sorting) being eliminated during physical plan optimizations.

What changes are included in this PR?

Are these changes tested?

A new test via dataframe/logical plan builder has been added.

Are there any user-facing changes?

Not via SQL, only when building your plans via the dataframe and/or LogicalPlanBuilder API.

@github-actions github-actions bot added the core Core DataFusion crate label Jul 15, 2025
@bert-beyondloops bert-beyondloops force-pushed the copy_to_preserve_ordering branch from 418f15d to 0c70580 Compare July 15, 2025 12:19
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @bert-beyondloops

I also double checked that COPY in SQL preserves the order

> explain COPY ( SELECT * FROM values(1),(2) ORDER BY column1) TO '/tmp/foo.csv';
+---------------+-------------------------------+
| plan_type     | plan                          |
+---------------+-------------------------------+
| physical_plan | ┌───────────────────────────┐ |
|               | │        DataSinkExec       │ |
|               | │    --------------------   │ |
|               | │     file: /tmp/foo.csv    │ |
|               | │                           │ |
|               | │        format: csv        │ |
|               | └─────────────┬─────────────┘ |
|               | ┌─────────────┴─────────────┐ |
|               | │          SortExec         │ |
|               | │    --------------------   │ |
|               | │  column1@0 ASC NULLS LAST │ |
|               | └─────────────┬─────────────┘ |
|               | ┌─────────────┴─────────────┐ |
|               | │       DataSourceExec      │ |
|               | │    --------------------   │ |
|               | │         bytes: 128        │ |
|               | │       format: memory      │ |
|               | │          rows: 1          │ |
|               | └───────────────────────────┘ |
|               |                               |
+---------------+-------------------------------+
1 row(s) fetched.
Elapsed 0.000 seconds.

@@ -6193,3 +6194,59 @@ async fn test_copy_schema() -> Result<()> {
assert_logical_expr_schema_eq_physical_expr_schema(result).await?;
Ok(())
}

#[tokio::test]
async fn test_copy_to_preserves_order() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verified that this test fails without the code change in the PR:

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ Snapshot Summary ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Snapshot: copy_to_preserves_order
Source: datafusion/core/tests/dataframe/mod.rs:6241
────────────────────────────────────────────────────────────────────────────────
Expression: physical_plan_format
────────────────────────────────────────────────────────────────────────────────
-old snapshot
+new results
────────────┬───────────────────────────────────────────────────────────────────
    0     0 │ UnionExec
    1     1 │   DataSinkExec: sink=CsvSink(file_groups=[])
    2       │-    SortExec: expr=[column1@0 DESC], preserve_partitioning=[false]
    3       │-      DataSourceExec: partitions=1, partition_sizes=[1]
          2 │+    DataSourceExec: partitions=1, partition_sizes=[1]
    4     3 │   DataSourceExec: partitions=1, partition_sizes=[1]
────────────┴───────────────────────────────────────────────────────────────────

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
@bert-beyondloops
Copy link
Contributor Author

Thanks @alamb for the quick review 🙏

Copy link
Contributor

@zhuqi-lucas zhuqi-lucas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM Thank you @bert-beyondloops !

Copy link
Member

@xudong963 xudong963 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The failing ci seems not to be related the PR.

@alamb
Copy link
Contributor

alamb commented Jul 16, 2025

close/reopen to retrigger CI

@alamb alamb closed this Jul 16, 2025
@alamb alamb reopened this Jul 16, 2025
@alamb alamb merged commit 01234eb into apache:main Jul 17, 2025
27 checks passed
@alamb
Copy link
Contributor

alamb commented Jul 17, 2025

Thanks again @bert-beyondloops

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

CopyTo plan loses ordering requirement during physical plan optimization
4 participants