Skip to content

Commit cc972fd

Browse files
[Data] Cleaning up Operator Fusion rule (#52566)
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? 1. Minor clean up of `FuseOperators` rule 2. Abstracted operator fusion rules into `test_operator_fusion` ## Related issue number <!-- For example: "Closes #1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
1 parent ef2b05a commit cc972fd

File tree

8 files changed

+662
-627
lines changed

8 files changed

+662
-627
lines changed

python/ray/data/BUILD

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -704,6 +704,20 @@ py_test(
704704
],
705705
)
706706

707+
py_test(
708+
name = "test_operator_fusion",
709+
size = "medium",
710+
srcs = ["tests/test_operator_fusion.py"],
711+
tags = [
712+
"exclusive",
713+
"team:data",
714+
],
715+
deps = [
716+
":conftest",
717+
"//:ray_lib",
718+
],
719+
)
720+
707721
py_test(
708722
name = "test_execution_optimizer",
709723
size = "medium",

python/ray/data/_internal/logical/optimizers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from ray.data._internal.logical.rules.inherit_target_max_block_size import (
1414
InheritTargetMaxBlockSizeRule,
1515
)
16-
from ray.data._internal.logical.rules.operator_fusion import OperatorFusionRule
16+
from ray.data._internal.logical.rules.operator_fusion import FuseOperators
1717
from ray.data._internal.logical.rules.randomize_blocks import ReorderRandomizeBlocksRule
1818
from ray.data._internal.logical.rules.set_read_parallelism import SetReadParallelismRule
1919
from ray.data._internal.logical.rules.zero_copy_map_fusion import (
@@ -35,7 +35,7 @@
3535
[
3636
InheritTargetMaxBlockSizeRule,
3737
SetReadParallelismRule,
38-
OperatorFusionRule,
38+
FuseOperators,
3939
EliminateBuildOutputBlocks,
4040
ConfigureMapTaskMemoryUsingOutputSize,
4141
]
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from ray.data._internal.logical.rules.operator_fusion import OperatorFusionRule
1+
from ray.data._internal.logical.rules.operator_fusion import FuseOperators
22
from ray.data._internal.logical.rules.randomize_blocks import ReorderRandomizeBlocksRule
33

4-
__all__ = ["ReorderRandomizeBlocksRule", "OperatorFusionRule"]
4+
__all__ = ["ReorderRandomizeBlocksRule", "FuseOperators"]

python/ray/data/_internal/logical/rules/operator_fusion.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
INHERITABLE_REMOTE_ARGS = ["scheduling_strategy"]
3939

4040

41-
class OperatorFusionRule(Rule):
41+
class FuseOperators(Rule):
4242
"""Fuses linear chains of compatible physical operators."""
4343

4444
def apply(self, plan: PhysicalPlan) -> PhysicalPlan:
@@ -54,21 +54,21 @@ def apply(self, plan: PhysicalPlan) -> PhysicalPlan:
5454
# Update output dependencies after fusion.
5555
# TODO(hchen): Instead of updating the depdencies manually,
5656
# we need a better abstraction for manipulating the DAG.
57-
self._remove_output_depes(fused_dag)
58-
self._update_output_depes(fused_dag)
57+
self._remove_output_deps(fused_dag)
58+
self._update_output_deps(fused_dag)
5959

6060
new_plan = PhysicalPlan(fused_dag, self._op_map, plan.context)
6161
return new_plan
6262

63-
def _remove_output_depes(self, op: PhysicalOperator) -> None:
63+
def _remove_output_deps(self, op: PhysicalOperator) -> None:
6464
for input in op._input_dependencies:
6565
input._output_dependencies = []
66-
self._remove_output_depes(input)
66+
self._remove_output_deps(input)
6767

68-
def _update_output_depes(self, op: PhysicalOperator) -> None:
68+
def _update_output_deps(self, op: PhysicalOperator) -> None:
6969
for input in op._input_dependencies:
7070
input._output_dependencies.append(op)
71-
self._update_output_depes(input)
71+
self._update_output_deps(input)
7272

7373
def _fuse_map_operators_in_dag(self, dag: PhysicalOperator) -> MapOperator:
7474
"""Starting at the given operator, traverses up the DAG of operators
@@ -305,6 +305,7 @@ def _get_fused_map_operator(
305305
# Merge minimum block sizes.
306306
down_min_rows_per_bundled_input = down_logical_op._min_rows_per_bundled_input
307307
up_min_rows_per_bundled_input = up_logical_op._min_rows_per_bundled_input
308+
308309
if (
309310
down_min_rows_per_bundled_input is not None
310311
and up_min_rows_per_bundled_input is not None

python/ray/data/_internal/logical/rules/zero_copy_map_fusion.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
)
1111
from ray.data._internal.logical.interfaces.optimizer import Rule
1212
from ray.data._internal.logical.interfaces.physical_plan import PhysicalPlan
13-
from ray.data._internal.logical.rules.operator_fusion import OperatorFusionRule
13+
from ray.data._internal.logical.rules.operator_fusion import FuseOperators
1414

1515

1616
class ZeroCopyMapFusionRule(Rule):
@@ -27,7 +27,7 @@ class ZeroCopyMapFusionRule(Rule):
2727

2828
@classmethod
2929
def dependencies(cls) -> List[Type[Rule]]:
30-
return [OperatorFusionRule]
30+
return [FuseOperators]
3131

3232
def apply(self, plan: PhysicalPlan) -> PhysicalPlan:
3333
self._traverse(plan.dag)

0 commit comments

Comments
 (0)