Skip to content

Add support for automatic join column deduplication in DataFrame joins #1185

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
460bae9
refactor: style loading logic in DataFrameHtmlFormatter
kosiew Jul 4, 2025
ab224a6
Add deduplicate option to DataFrame.join to drop duplicate join columns
kosiew Jul 8, 2025
42f5a72
test: add deduplicate join selection tests for DataFrame
kosiew Jul 8, 2025
a58a574
test: add test for select after deduplicated join on DataFrame
kosiew Jul 8, 2025
fa80aa6
docs: enhance joins.rst with details on DataFrame naming and deduplic…
kosiew Jul 8, 2025
7d7146c
feat: introduce JoinKeys dataclass for improved join key handling in …
kosiew Jul 8, 2025
f246224
docs: enhance _prepare_deduplicate docstring with detailed parameter …
kosiew Jul 8, 2025
0ad1b00
feat: implement collision-safe temporary aliases for join column rena…
kosiew Jul 8, 2025
01af791
fix: update sorting in test_join_deduplicate_multi to include multipl…
kosiew Jul 8, 2025
4dd5369
test: add deduplication tests for all join types in test_join_dedupli…
kosiew Jul 8, 2025
ec769fe
test: enhance join deduplication tests with schema validation and ind…
kosiew Jul 8, 2025
19d69ca
fix: improve error messages for join key validation in DataFrame
kosiew Jul 8, 2025
0bb81df
feat: enhance join operation preparation with JoinPreparation class
kosiew Jul 8, 2025
3dc96f3
fix join deduplicate and tests
kosiew Jul 8, 2025
628dc15
docs: add example for selecting columns after deduplication in joins
kosiew Jul 9, 2025
ba54f7d
fix: add SessionContext import for DataFrame creation example in join…
kosiew Jul 9, 2025
584c600
fix: add safety comment for type checking in join keys assignment
kosiew Jul 9, 2025
17ce6eb
fix Ruff errors
kosiew Jul 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion docs/source/user-guide/common-operations/joins.rst
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,48 @@ the right table.

.. ipython:: python

left.join(right, left_on="customer_id", right_on="id", how="anti")
left.join(right, left_on="customer_id", right_on="id", how="anti")

Disambiguating Columns
----------------------

When the join key exists in both DataFrames under the same name, the result contains two columns with that name. Assign a name to each DataFrame to use as a prefix and avoid ambiguity.

When you create a DataFrame with a ``name`` argument, that name is used as a prefix in ``col("name.column")`` to reference specific columns.

.. ipython:: python

from datafusion import col, SessionContext
ctx = SessionContext()
left = ctx.from_pydict({"id": [1, 2]}, name="l")
right = ctx.from_pydict({"id": [2, 3]}, name="r")
joined = left.join(right, on="id")
joined.select(col("l.id"), col("r.id"))

Note that the columns in the result appear in the same order as specified in the ``select()`` call.

You can remove the duplicate column after joining. Note that ``drop()`` returns a new DataFrame (DataFusion's API is immutable).

.. ipython:: python

joined.drop("r.id")

Automatic Deduplication
----------------------

Use the ``deduplicate`` argument of :py:meth:`DataFrame.join` to automatically
drop the duplicate join column from the right DataFrame. Unlike PySpark which uses a ``_`` suffix by default,
DataFusion uses the ``__right_<col>`` naming convention for conflicting columns when not using deduplication.

.. ipython:: python

left.join(right, on="id", deduplicate=True)

After deduplication, you can select the join column (which comes from the left DataFrame) and other columns as usual:

.. ipython:: python

# Select the id column and other columns from both DataFrames
joined_dedup = left.join(right, on="id", deduplicate=True)
joined_dedup.select("id", "customer", "name")

156 changes: 144 additions & 12 deletions python/datafusion/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

from __future__ import annotations

import uuid
import warnings
from dataclasses import dataclass
from typing import (
TYPE_CHECKING,
Any,
Expand All @@ -44,6 +46,8 @@
from datafusion.plan import ExecutionPlan, LogicalPlan
from datafusion.record_batch import RecordBatchStream

from .functions import coalesce, col

if TYPE_CHECKING:
import pathlib
from typing import Callable, Sequence
Expand All @@ -57,6 +61,49 @@
from enum import Enum


@dataclass
class JoinKeys:
"""Represents the resolved join keys for a DataFrame join operation."""

on: str | Sequence[str] | None
left_names: list[str]
right_names: list[str]


@dataclass
class JoinPreparation:
"""Represents the complete preparation for a DataFrame join operation."""

join_keys: JoinKeys
modified_right: DataFrame
drop_cols: list[str]


def _deduplicate_right(
right: DataFrame, columns: Sequence[str]
) -> tuple[DataFrame, list[str]]:
"""Rename join columns on the right DataFrame for deduplication."""
existing_columns = set(right.schema().names)
modified = right
aliases: list[str] = []

for col_name in columns:
base_alias = f"__right_{col_name}"
alias = base_alias
counter = 0
while alias in existing_columns:
counter += 1
alias = f"{base_alias}_{counter}"
if alias in existing_columns:
alias = f"__temp_{uuid.uuid4().hex[:8]}_{col_name}"

modified = modified.with_column_renamed(col_name, alias)
aliases.append(alias)
existing_columns.add(alias)

return modified, aliases


# excerpt from deltalake
# https://github.com/apache/datafusion-python/pull/981#discussion_r1905619163
class Compression(Enum):
Expand Down Expand Up @@ -678,6 +725,7 @@ def join(
left_on: str | Sequence[str] | None = None,
right_on: str | Sequence[str] | None = None,
join_keys: tuple[list[str], list[str]] | None = None,
deduplicate: bool = False,
) -> DataFrame:
"""Join this :py:class:`DataFrame` with another :py:class:`DataFrame`.

Expand All @@ -691,13 +739,72 @@ def join(
left_on: Join column of the left dataframe.
right_on: Join column of the right dataframe.
join_keys: Tuple of two lists of column names to join on. [Deprecated]
deduplicate: If ``True``, drop duplicate join columns from the
right DataFrame similar to PySpark's ``on`` behavior.

Returns:
DataFrame after join.
"""
# This check is to prevent breaking API changes where users prior to
# DF 43.0.0 would pass the join_keys as a positional argument instead
# of a keyword argument.
join_preparation = self._prepare_join(
right, on, left_on, right_on, join_keys, deduplicate
)

result = DataFrame(
self.df.join(
join_preparation.modified_right.df,
how,
join_preparation.join_keys.left_names,
join_preparation.join_keys.right_names,
)
)

if (
deduplicate
and how in ("right", "full")
and join_preparation.join_keys.on is not None
):
for left_name, right_alias in zip(
join_preparation.join_keys.left_names,
join_preparation.drop_cols,
):
result = result.with_column(
left_name, coalesce(col(left_name), col(right_alias))
)

if join_preparation.drop_cols:
result = result.drop(*join_preparation.drop_cols)

return result

def _prepare_join(
self,
right: DataFrame,
on: str | Sequence[str] | tuple[list[str], list[str]] | None,
left_on: str | Sequence[str] | None,
right_on: str | Sequence[str] | None,
join_keys: tuple[list[str], list[str]] | None,
deduplicate: bool,
) -> JoinPreparation:
"""Prepare join keys and handle deduplication if requested.

This method combines join key resolution and deduplication preparation
to avoid parameter handling duplication and provide a unified interface.

Args:
right: The right DataFrame to join with.
on: Column names to join on in both dataframes.
left_on: Join column of the left dataframe.
right_on: Join column of the right dataframe.
join_keys: Tuple of two lists of column names to join on. [Deprecated]
deduplicate: If True, prepare right DataFrame for column deduplication.

Returns:
JoinPreparation containing resolved join keys, modified right DataFrame,
and columns to drop after joining.
"""
# Step 1: Resolve join keys
# Handle the special case where on is a tuple of lists (legacy format)
resolved_on: str | Sequence[str] | None
if (
isinstance(on, tuple)
and len(on) == 2
Expand All @@ -706,7 +813,9 @@ def join(
):
# We know this is safe because we've checked the types
join_keys = on # type: ignore[assignment]
on = None
resolved_on = None
else:
resolved_on = on # type: ignore[assignment]

if join_keys is not None:
warnings.warn(
Expand All @@ -717,25 +826,48 @@ def join(
left_on = join_keys[0]
right_on = join_keys[1]

if on is not None:
if resolved_on is not None:
if left_on is not None or right_on is not None:
error_msg = "`left_on` or `right_on` should not provided with `on`"
raise ValueError(error_msg)
left_on = on
right_on = on
left_on = resolved_on
right_on = resolved_on
elif left_on is not None or right_on is not None:
if left_on is None or right_on is None:
error_msg = "`left_on` and `right_on` should both be provided."
raise ValueError(error_msg)
else:
error_msg = "either `on` or `left_on` and `right_on` should be provided."
raise ValueError(error_msg)
if isinstance(left_on, str):
left_on = [left_on]
if isinstance(right_on, str):
right_on = [right_on]

return DataFrame(self.df.join(right.df, how, left_on, right_on))
# At this point, left_on and right_on are guaranteed to be non-None
if left_on is None or right_on is None: # pragma: no cover - sanity check
msg = "join keys resolved to None"
raise ValueError(msg)

left_names = [left_on] if isinstance(left_on, str) else list(left_on)
right_names = [right_on] if isinstance(right_on, str) else list(right_on)

drop_cols: list[str] = []
modified_right = right

if deduplicate and resolved_on is not None:
on_cols = (
[resolved_on] if isinstance(resolved_on, str) else list(resolved_on)
)
modified_right, aliases = _deduplicate_right(right, on_cols)
drop_cols.extend(aliases)
right_names = aliases.copy()

join_keys_resolved = JoinKeys(
on=resolved_on, left_names=left_names, right_names=right_names
)

return JoinPreparation(
join_keys=join_keys_resolved,
modified_right=modified_right,
drop_cols=drop_cols,
)

def join_on(
self,
Expand Down
Loading
Loading