Skip to content

Commit c6c338c

Browse files
[Data] Added support for Joins (using hash-shuffle) (#52728)
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? This PR adds `Dataset.join` operator allowing 2 datasets to be joined using previously added [Hash-shuffle implementation](#52664). Currently following types of joins are supported: - Inner (left/right) - Outer (left/right) In the future we'll be adding support for more join types. Changes --- - Added `JoinOperator` - Added `Dataset.join` - Added tests ## Related issue number Closes #18911 (Finally!!!) ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
1 parent a7f1f24 commit c6c338c

File tree

7 files changed

+937
-11
lines changed

7 files changed

+937
-11
lines changed

python/ray/data/BUILD

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,21 @@ py_test(
310310
],
311311
)
312312

313+
py_test(
314+
name = "test_join",
315+
size = "medium",
316+
srcs = ["tests/test_join.py"],
317+
tags = [
318+
"data_non_parallel",
319+
"exclusive",
320+
"team:data",
321+
],
322+
deps = [
323+
":conftest",
324+
"//:ray_lib",
325+
],
326+
)
327+
313328
py_test(
314329
name = "test_binary",
315330
size = "small",
Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
import logging
2+
import math
3+
from typing import Any, Dict, List, Optional, Tuple
4+
5+
from ray.data._internal.execution.operators.hash_shuffle import (
6+
HashShufflingOperatorBase,
7+
StatefulShuffleAggregation,
8+
)
9+
from ray.data._internal.logical.operators.join_operator import JoinType
10+
from ray.data import DataContext
11+
from ray.data._internal.arrow_block import ArrowBlockBuilder
12+
from ray.data._internal.execution.interfaces import PhysicalOperator
13+
from ray.data._internal.util import GiB
14+
from ray.data.block import Block
15+
16+
_JOIN_TYPE_TO_ARROW_JOIN_VERB_MAP = {
17+
JoinType.INNER: "inner",
18+
JoinType.LEFT_OUTER: "left outer",
19+
JoinType.RIGHT_OUTER: "right outer",
20+
JoinType.FULL_OUTER: "full outer",
21+
}
22+
23+
24+
logger = logging.getLogger(__name__)
25+
26+
27+
class JoiningShuffleAggregation(StatefulShuffleAggregation):
28+
"""Aggregation performing distributed joining of the 2 sequences,
29+
by utilising hash-based shuffling.
30+
31+
Hash-based shuffling applied to 2 input sequences and employing the same
32+
partitioning scheme allows to
33+
34+
- Accumulate identical keys from both sequences into the same
35+
(numerical) partition. In other words, all keys such that
36+
37+
hash(key) % num_partitions = partition_id
38+
39+
- Perform join on individual partitions independently (from other partitions)
40+
41+
For actual joining Pyarrow native joining functionality is utilised, providing
42+
incredible performance while allowing keep the data from being deserialized.
43+
"""
44+
45+
def __init__(
46+
self,
47+
*,
48+
aggregator_id: int,
49+
join_type: JoinType,
50+
left_key_col_names: Tuple[str],
51+
right_key_col_names: Tuple[str],
52+
target_partition_ids: List[int],
53+
left_columns_suffix: Optional[str] = None,
54+
right_columns_suffix: Optional[str] = None,
55+
):
56+
super().__init__(aggregator_id)
57+
58+
assert (
59+
len(left_key_col_names) > 0
60+
), "At least 1 column to join on has to be provided"
61+
assert len(right_key_col_names) == len(
62+
left_key_col_names
63+
), "Number of column for both left and right join operands has to match"
64+
65+
assert join_type in _JOIN_TYPE_TO_ARROW_JOIN_VERB_MAP, (
66+
f"Join type is not currently supported (got: {join_type}; " # noqa: C416
67+
f"supported: {[jt for jt in JoinType]})" # noqa: C416
68+
)
69+
70+
self._left_key_col_names: Tuple[str] = left_key_col_names
71+
self._right_key_col_names: Tuple[str] = right_key_col_names
72+
self._join_type: JoinType = join_type
73+
74+
self._left_columns_suffix: Optional[str] = left_columns_suffix
75+
self._right_columns_suffix: Optional[str] = right_columns_suffix
76+
77+
# Partition builders for the partition corresponding to
78+
# left and right input sequences respectively
79+
self._left_input_seq_partition_builders: Dict[int, ArrowBlockBuilder] = {
80+
partition_id: ArrowBlockBuilder() for partition_id in target_partition_ids
81+
}
82+
83+
self._right_input_seq_partition_builders: Dict[int, ArrowBlockBuilder] = {
84+
partition_id: ArrowBlockBuilder() for partition_id in target_partition_ids
85+
}
86+
87+
def accept(self, input_seq_id: int, partition_id: int, partition_shard: Block):
88+
assert 0 <= input_seq_id < 2
89+
90+
partition_builder = self._get_partition_builder(
91+
input_seq_id=input_seq_id,
92+
partition_id=partition_id,
93+
)
94+
95+
partition_builder.add_block(partition_shard)
96+
97+
def finalize(self, partition_id: int) -> Block:
98+
import pyarrow as pa
99+
100+
left_seq_partition: pa.Table = self._get_partition_builder(
101+
input_seq_id=0, partition_id=partition_id
102+
).build()
103+
right_seq_partition: pa.Table = self._get_partition_builder(
104+
input_seq_id=1, partition_id=partition_id
105+
).build()
106+
107+
arrow_join_type = _JOIN_TYPE_TO_ARROW_JOIN_VERB_MAP[self._join_type]
108+
109+
joined = left_seq_partition.join(
110+
right_seq_partition,
111+
join_type=arrow_join_type,
112+
keys=list(self._left_key_col_names),
113+
right_keys=(list(self._right_key_col_names)),
114+
left_suffix=self._left_columns_suffix,
115+
right_suffix=self._right_columns_suffix,
116+
)
117+
118+
return joined
119+
120+
def clear(self, partition_id: int):
121+
self._left_input_seq_partition_builders.pop(partition_id)
122+
self._right_input_seq_partition_builders.pop(partition_id)
123+
124+
def _get_partition_builder(self, *, input_seq_id: int, partition_id: int):
125+
if input_seq_id == 0:
126+
partition_builder = self._left_input_seq_partition_builders[partition_id]
127+
elif input_seq_id == 1:
128+
partition_builder = self._right_input_seq_partition_builders[partition_id]
129+
else:
130+
raise ValueError(
131+
f"Unexpected inpt sequence id of '{input_seq_id}' (expected 0 or 1)"
132+
)
133+
return partition_builder
134+
135+
136+
class JoinOperator(HashShufflingOperatorBase):
137+
def __init__(
138+
self,
139+
data_context: DataContext,
140+
left_input_op: PhysicalOperator,
141+
right_input_op: PhysicalOperator,
142+
left_key_columns: Tuple[str],
143+
right_key_columns: Tuple[str],
144+
join_type: JoinType,
145+
*,
146+
num_partitions: int,
147+
left_columns_suffix: Optional[str] = None,
148+
right_columns_suffix: Optional[str] = None,
149+
partition_size_hint: Optional[int] = None,
150+
aggregator_ray_remote_args_override: Optional[Dict[str, Any]] = None,
151+
):
152+
super().__init__(
153+
name=f"Join(num_partitions={num_partitions})",
154+
input_ops=[left_input_op, right_input_op],
155+
data_context=data_context,
156+
key_columns=[left_key_columns, right_key_columns],
157+
num_partitions=num_partitions,
158+
partition_size_hint=partition_size_hint,
159+
partition_aggregation_factory=(
160+
lambda aggregator_id, target_partition_ids: JoiningShuffleAggregation(
161+
aggregator_id=aggregator_id,
162+
left_key_col_names=left_key_columns,
163+
right_key_col_names=right_key_columns,
164+
join_type=join_type,
165+
target_partition_ids=target_partition_ids,
166+
left_columns_suffix=left_columns_suffix,
167+
right_columns_suffix=right_columns_suffix,
168+
)
169+
),
170+
aggregator_ray_remote_args_override=aggregator_ray_remote_args_override,
171+
)
172+
173+
def _get_default_num_cpus_per_partition(self) -> int:
174+
"""
175+
CPU allocation for aggregating actors of Join operator is calculated as:
176+
num_cpus (per partition) = CPU budget / # partitions
177+
178+
Assuming:
179+
- Default number of partitions: 64
180+
- Total operator's CPU budget with default settings: 8 cores
181+
- Number of CPUs per partition: 8 / 64 = 0.125
182+
183+
These CPU budgets are derived such that Ray Data pipeline could run on a
184+
single node (using the default settings).
185+
"""
186+
return 0.125
187+
188+
def _get_operator_num_cpus_per_partition_override(self) -> int:
189+
return self.data_context.join_operator_actor_num_cpus_per_partition_override
190+
191+
@classmethod
192+
def _estimate_aggregator_memory_allocation(
193+
cls,
194+
*,
195+
num_aggregators: int,
196+
num_partitions: int,
197+
partition_byte_size_estimate: int,
198+
) -> int:
199+
dataset_size = num_partitions * partition_byte_size_estimate
200+
# Estimate of object store memory required to accommodate all partitions
201+
# handled by a single aggregator
202+
#
203+
# NOTE: x2 due to 2 sequences involved in joins
204+
aggregator_shuffle_object_store_memory_required: int = math.ceil(
205+
2 * dataset_size / num_aggregators
206+
)
207+
# Estimate of memory required to perform actual (in-memory) join
208+
# operation (inclusive of 50% overhead allocated for Pyarrow join
209+
# implementation)
210+
#
211+
# NOTE:
212+
# - x2 due to 2 partitions (from left/right sequences)
213+
# - x1.5 due to 50% overhead of in-memory join
214+
join_memory_required: int = math.ceil(partition_byte_size_estimate * 3)
215+
# Estimate of memory required to accommodate single partition as an output
216+
# (inside Object Store)
217+
#
218+
# NOTE: x2 due to 2 sequences involved in joins
219+
output_object_store_memory_required: int = 2 * partition_byte_size_estimate
220+
221+
aggregator_total_memory_required: int = (
222+
# Inputs (object store)
223+
aggregator_shuffle_object_store_memory_required
224+
+
225+
# Join (heap)
226+
join_memory_required
227+
+
228+
# Output (object store)
229+
output_object_store_memory_required
230+
)
231+
232+
logger.debug(
233+
f"Estimated memory requirement for joining aggregator "
234+
f"(partitions={num_partitions}, aggregators={num_aggregators}): "
235+
f"shuffle={aggregator_shuffle_object_store_memory_required / GiB:.2f}GiB, "
236+
f"joining={join_memory_required / GiB:.2f}GiB, "
237+
f"output={output_object_store_memory_required / GiB:.2f}GiB, "
238+
f"total={aggregator_total_memory_required / GiB:.2f}GiB, "
239+
)
240+
241+
return aggregator_total_memory_required
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
from enum import Enum
2+
from typing import TYPE_CHECKING, Any, Dict, Optional, Sequence, Tuple
3+
4+
from ray.data._internal.logical.interfaces import LogicalOperator
5+
from ray.data._internal.logical.operators.n_ary_operator import NAry
6+
7+
if TYPE_CHECKING:
8+
from ray.data import Schema
9+
10+
11+
class JoinType(Enum):
12+
INNER = "inner"
13+
LEFT_OUTER = "left_outer"
14+
RIGHT_OUTER = "right_outer"
15+
FULL_OUTER = "full_outer"
16+
17+
18+
class Join(NAry):
19+
"""Logical operator for join."""
20+
21+
def __init__(
22+
self,
23+
left_input_op: LogicalOperator,
24+
right_input_op: LogicalOperator,
25+
join_type: str,
26+
left_key_columns: Tuple[str],
27+
right_key_columns: Tuple[str],
28+
*,
29+
num_partitions: int,
30+
left_columns_suffix: Optional[str] = None,
31+
right_columns_suffix: Optional[str] = None,
32+
partition_size_hint: Optional[int] = None,
33+
aggregator_ray_remote_args: Optional[Dict[str, Any]] = None,
34+
):
35+
"""
36+
Args:
37+
left_input_op: The input operator at left hand side.
38+
right_input_op: The input operator at right hand side.
39+
join_type: The kind of join that should be performed, one of (“inner”,
40+
“left_outer”, “right_outer”, “full_outer”).
41+
left_key_columns: The columns from the left Dataset that should be used as
42+
keys of the join operation.
43+
right_key_columns: The columns from the right Dataset that should be used as
44+
keys of the join operation.
45+
partition_size_hint: Hint to joining operator about the estimated
46+
avg expected size of the resulting partition (in bytes)
47+
num_partitions: Total number of expected blocks outputted by this
48+
operator.
49+
"""
50+
51+
try:
52+
join_type_enum = JoinType(join_type)
53+
except ValueError:
54+
raise ValueError(
55+
f"Invalid join type: '{join_type}'. "
56+
f"Supported join types are: {', '.join(jt.value for jt in JoinType)}."
57+
)
58+
59+
super().__init__(left_input_op, right_input_op, num_outputs=num_partitions)
60+
61+
self._left_key_columns = left_key_columns
62+
self._right_key_columns = right_key_columns
63+
self._join_type = join_type_enum
64+
65+
self._left_columns_suffix = left_columns_suffix
66+
self._right_columns_suffix = right_columns_suffix
67+
68+
self._partition_size_hint = partition_size_hint
69+
self._aggregator_ray_remote_args = aggregator_ray_remote_args
70+
71+
@staticmethod
72+
def _validate_schemas(
73+
left_op_schema: "Schema",
74+
right_op_schema: "Schema",
75+
left_key_column_names: Tuple[str],
76+
right_key_column_names: Tuple[str],
77+
):
78+
def _col_names_as_str(keys: Sequence[str]):
79+
keys_joined = ", ".join(map(lambda k: f"'{k}'", keys))
80+
return f"[{keys_joined}]"
81+
82+
if len(left_key_column_names) < 1:
83+
raise ValueError(
84+
f"At least 1 column name to join on has to be provided (got "
85+
f"{_col_names_as_str(left_key_column_names)})"
86+
)
87+
88+
if len(left_key_column_names) != len(right_key_column_names):
89+
raise ValueError(
90+
f"Number of columns provided for left and right datasets has to match "
91+
f"(got {_col_names_as_str(left_key_column_names)} and "
92+
f"{_col_names_as_str(right_key_column_names)})"
93+
)
94+
95+
def _get_key_column_types(schema: "Schema", keys: Tuple[str]):
96+
return (
97+
[
98+
_type
99+
for name, _type in zip(schema.names, schema.types)
100+
if name in keys
101+
]
102+
if schema
103+
else None
104+
)
105+
106+
right_op_key_cols = _get_key_column_types(
107+
right_op_schema, left_key_column_names
108+
)
109+
left_op_key_cols = _get_key_column_types(left_op_schema, right_key_column_names)
110+
111+
if left_op_key_cols != right_op_key_cols:
112+
raise ValueError(
113+
f"Key columns are expected to be present and have the same types "
114+
"in both left and right operands of the join operation: "
115+
f"left has {left_op_schema}, but right has {right_op_schema}"
116+
)

0 commit comments

Comments
 (0)