Skip to content

Commit d87d2fd

Browse files
committed
Address review comments
Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
1 parent eb93950 commit d87d2fd

10 files changed

+71
-80
lines changed

python/ray/data/_internal/execution/interfaces/physical_operator.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -542,7 +542,7 @@ def pending_processor_usage(self) -> ExecutionResources:
542542
"""
543543
return ExecutionResources(0, 0, 0)
544544

545-
def min_max_resource_usage_bounds(
545+
def min_max_resource_requirements(
546546
self,
547547
) -> Tuple[ExecutionResources, ExecutionResources]:
548548
"""Returns the min and max resources to start the operator and make progress.

python/ray/data/_internal/execution/operators/actor_pool_map_operator.py

+10-15
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ def progress_str(self) -> str:
302302
)
303303
return "[locality off]"
304304

305-
def min_max_resource_usage_bounds(
305+
def min_max_resource_requirements(
306306
self,
307307
) -> Tuple[ExecutionResources, ExecutionResources]:
308308
min_actors = self._actor_pool.min_size()
@@ -316,34 +316,29 @@ def min_max_resource_usage_bounds(
316316
num_gpus_per_actor = self._ray_remote_args.get("num_gpus", 0)
317317

318318
# Compute the minimum resource usage.
319-
if num_gpus_per_actor > 0:
320-
# To ensure that all GPUs are utilized, reserve enough resource budget
321-
# to launch one task for each worker.
322-
min_object_store_memory = (
323-
self._metrics.obj_store_mem_max_pending_output_per_task * min_actors
324-
)
325-
else:
326-
# If the actors aren't using GPUs, only reserve memory for one task.
327-
min_object_store_memory = (
328-
self._metrics.obj_store_mem_max_pending_output_per_task * 1
329-
)
330319
min_resource_usage = ExecutionResources(
331320
cpu=num_cpus_per_actor * min_actors,
332321
gpu=num_gpus_per_actor * min_actors,
333-
object_store_memory=min_object_store_memory,
322+
# To ensure that all actors are utilized, reserve enough resource budget
323+
# to launch one task for each worker.
324+
object_store_memory=self._metrics.obj_store_mem_max_pending_output_per_task
325+
* min_actors,
334326
)
335327

336328
# Compute the maximum resource usage.
337329
if math.isinf(max_actors):
338330
max_resource_usage = ExecutionResources.for_limits()
339331
else:
340-
max_tasks_per_actor = self._actor_pool.max_tasks_in_flight_per_actor()
332+
max_concurrency = self._ray_remote_args.get("max_concurrency", 1)
333+
max_concurrent_tasks_per_actor = min(
334+
self._actor_pool.max_tasks_in_flight_per_actor(), max_concurrency
335+
)
341336
max_per_actor_resource_usage = ExecutionResources(
342337
cpu=num_cpus_per_actor,
343338
gpu=num_gpus_per_actor,
344339
object_store_memory=(
345340
self._metrics.obj_store_mem_max_pending_output_per_task
346-
* max_tasks_per_actor
341+
* max_concurrent_tasks_per_actor
347342
),
348343
)
349344
max_resource_usage = max_per_actor_resource_usage.scale(max_actors)

python/ray/data/_internal/execution/operators/map_operator.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,7 @@ def pending_processor_usage(self) -> ExecutionResources:
489489
raise NotImplementedError
490490

491491
@abstractmethod
492-
def min_max_resource_usage_bounds(
492+
def min_max_resource_requirements(
493493
self,
494494
) -> Tuple[ExecutionResources, ExecutionResources]:
495495
raise NotImplementedError

python/ray/data/_internal/execution/operators/task_pool_map_operator.py

+14-7
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ def _add_bundled_input(self, bundle: RefBundle):
110110
def progress_str(self) -> str:
111111
return ""
112112

113-
def min_max_resource_usage_bounds(
113+
def min_max_resource_requirements(
114114
self,
115115
) -> Tuple[ExecutionResources, ExecutionResources]:
116116
return self._min_resource_usage(), self._max_resource_usage()
@@ -120,21 +120,28 @@ def _min_resource_usage(self) -> ExecutionResources:
120120
return self.incremental_resource_usage()
121121

122122
def _max_resource_usage(self) -> ExecutionResources:
123+
num_cpus_per_task = self._ray_remote_args.get("num_cpus", 0)
124+
num_gpus_per_task = self._ray_remote_args.get("num_gpus", 0)
125+
object_store_memory_per_task = (
126+
self._metrics.obj_store_mem_max_pending_output_per_task or 0
127+
)
128+
123129
if self._inputs_complete:
124130
# If the operator has already received all input data, we know it won't
125131
# launch more tasks. So, we only need to reserve resources for the tasks
126132
# that are currently running.
127-
num_cpus_per_task = self._ray_remote_args.get("num_cpus", 0)
128-
num_gpus_per_task = self._ray_remote_args.get("num_gpus", 0)
129-
object_store_memory_per_task = (
130-
self._metrics.obj_store_mem_max_pending_output_per_task or 0
131-
)
132-
resources = ExecutionResources.for_limits(
133+
resources = ExecutionResources(
133134
cpu=num_cpus_per_task * self.num_active_tasks(),
134135
gpu=num_gpus_per_task * self.num_active_tasks(),
135136
object_store_memory=object_store_memory_per_task
136137
* self.num_active_tasks(),
137138
)
139+
elif self._concurrency is not None:
140+
resources = ExecutionResources(
141+
cpu=num_cpus_per_task * self._concurrency,
142+
gpu=num_gpus_per_task * self._concurrency,
143+
object_store_memory=object_store_memory_per_task * self._concurrency,
144+
)
138145
else:
139146
resources = ExecutionResources.for_limits()
140147

python/ray/data/_internal/execution/resource_manager.py

+1-12
Original file line numberDiff line numberDiff line change
@@ -418,9 +418,6 @@ def __init__(self, resource_manager: ResourceManager, reservation_ratio: float):
418418
# See `test_no_deadlock_on_small_cluster_resources` as an example.
419419
self._reserved_min_resources: Dict[PhysicalOperator, bool] = {}
420420

421-
self._cached_global_limits = ExecutionResources.zero()
422-
self._cached_num_eligible_ops = 0
423-
424421
self._idle_detector = self.IdleDetector()
425422

426423
def _is_op_eligible(self, op: PhysicalOperator) -> bool:
@@ -441,14 +438,6 @@ def _update_reservation(self):
441438
global_limits = self._resource_manager.get_global_limits()
442439
eligible_ops = self._get_eligible_ops()
443440

444-
if (
445-
global_limits == self._cached_global_limits
446-
and len(eligible_ops) == self._cached_num_eligible_ops
447-
):
448-
return
449-
self._cached_global_limits = global_limits
450-
self._cached_num_eligible_ops = len(eligible_ops)
451-
452441
self._op_reserved.clear()
453442
self._reserved_for_op_outputs.clear()
454443
self._reserved_min_resources.clear()
@@ -470,7 +459,7 @@ def _update_reservation(self):
470459
0, 0, default_reserved.object_store_memory / 2
471460
)
472461

473-
min_resource_usage, max_resource_usage = op.min_max_resource_usage_bounds()
462+
min_resource_usage, max_resource_usage = op.min_max_resource_requirements()
474463
reserved_for_tasks = default_reserved.subtract(reserved_for_outputs)
475464
reserved_for_tasks = reserved_for_tasks.max(min_resource_usage)
476465
reserved_for_tasks = reserved_for_tasks.min(max_resource_usage)

python/ray/data/_internal/execution/streaming_executor.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ def walk(op):
457457

458458
base_usage = ExecutionResources(cpu=1)
459459
for op in walk(dag):
460-
min_resource_usage, _ = op.min_max_resource_usage_bounds()
460+
min_resource_usage, _ = op.min_max_resource_requirements()
461461
base_usage = base_usage.add(min_resource_usage)
462462

463463
if not base_usage.satisfies_limit(limits):

python/ray/data/tests/test_actor_pool_map_operator.py

+33-33
Original file line numberDiff line numberDiff line change
@@ -461,77 +461,72 @@ class MinMaxResourceUsageBoundsTestCase:
461461
min_size: int
462462
max_size: int
463463
obj_store_mem_max_pending_output_per_task: int
464-
max_tasks_in_flight: int
465464
expected_min_resource_usage_bound: ExecutionResources
466465
expected_max_resource_usage_bound: ExecutionResources
467-
num_gpus: int = 0
466+
max_tasks_in_flight: int = 4
467+
max_concurrency: int = 1
468468

469469

470470
@pytest.mark.parametrize(
471471
"case",
472472
[
473-
# Fixed-size CPU pool.
473+
# Fixed-size pool.
474474
MinMaxResourceUsageBoundsTestCase(
475475
min_size=2,
476476
max_size=2,
477477
obj_store_mem_max_pending_output_per_task=1,
478-
max_tasks_in_flight=4,
479478
expected_min_resource_usage_bound=ExecutionResources(
480-
cpu=2, object_store_memory=1
479+
cpu=2, object_store_memory=2
481480
),
482481
expected_max_resource_usage_bound=ExecutionResources(
483-
cpu=2, object_store_memory=2 * 4
482+
cpu=2, object_store_memory=2
484483
),
485484
),
486-
# Fixed-size GPU pool.
485+
# Autoscaling pool.
487486
MinMaxResourceUsageBoundsTestCase(
488-
min_size=2,
487+
min_size=1,
489488
max_size=2,
490-
num_gpus=1,
491-
max_tasks_in_flight=4,
492489
obj_store_mem_max_pending_output_per_task=1,
493-
# Unlike CPU pools, we should reserve enough object store memory so that
494-
# all actors can launch a task.
495490
expected_min_resource_usage_bound=ExecutionResources(
496-
cpu=2, gpu=2, object_store_memory=2
491+
cpu=1, object_store_memory=1
497492
),
498493
expected_max_resource_usage_bound=ExecutionResources(
499-
cpu=2, gpu=2, object_store_memory=2 * 4
494+
cpu=2, object_store_memory=2
500495
),
501496
),
502-
# Autoscaling CPU pool.
497+
# Unbounded pool.
503498
MinMaxResourceUsageBoundsTestCase(
504499
min_size=1,
505-
max_size=2,
506-
max_tasks_in_flight=4,
500+
max_size=None,
507501
obj_store_mem_max_pending_output_per_task=1,
508502
expected_min_resource_usage_bound=ExecutionResources(
509503
cpu=1, object_store_memory=1
510504
),
511-
expected_max_resource_usage_bound=ExecutionResources(
512-
cpu=2, object_store_memory=2 * 4
513-
),
505+
expected_max_resource_usage_bound=ExecutionResources.for_limits(),
514506
),
515-
# Unbounded CPU pool.
507+
# Multi-threaded pool.
516508
MinMaxResourceUsageBoundsTestCase(
517509
min_size=1,
518-
max_size=None,
519-
max_tasks_in_flight=4,
510+
max_size=1,
520511
obj_store_mem_max_pending_output_per_task=1,
512+
max_concurrency=2,
513+
max_tasks_in_flight=4,
521514
expected_min_resource_usage_bound=ExecutionResources(
522515
cpu=1, object_store_memory=1
523516
),
524-
expected_max_resource_usage_bound=ExecutionResources.for_limits(),
517+
expected_max_resource_usage_bound=ExecutionResources(
518+
cpu=1, object_store_memory=1 * 2
519+
),
525520
),
526521
],
527522
ids=[
528-
"fixed-size-cpu-pool",
529-
"fixed-size-gpu-pool",
530-
"autoscaling-cpu-pool",
531-
"unbounded-cpu-pool",
523+
"fixed-size-pool",
524+
"autoscaling-pool",
525+
"unbounded-pool",
526+
"multi-threaded-pool",
532527
],
533528
)
534-
def test_min_max_resource_usage_bounds(
529+
def test_min_max_resource_requirements(
535530
case, ray_start_regular_shared, restore_data_context
536531
):
537532
data_context = ray.data.DataContext.get_current()
@@ -545,7 +540,10 @@ def test_min_max_resource_usage_bounds(
545540
max_size=case.max_size,
546541
max_tasks_in_flight_per_actor=case.max_tasks_in_flight,
547542
),
548-
ray_remote_args={"num_cpus": 1, "num_gpus": case.num_gpus},
543+
ray_remote_args={
544+
"num_cpus": 1,
545+
"max_concurrency": case.max_concurrency,
546+
},
549547
)
550548
op._metrics = MagicMock(
551549
obj_store_mem_max_pending_output_per_task=case.obj_store_mem_max_pending_output_per_task
@@ -554,10 +552,12 @@ def test_min_max_resource_usage_bounds(
554552
(
555553
min_resource_usage_bound,
556554
max_resource_usage_bound,
557-
) = op.min_max_resource_usage_bounds()
555+
) = op.min_max_resource_requirements()
558556

559-
assert min_resource_usage_bound == case.expected_min_resource_usage_bound
560-
assert max_resource_usage_bound == case.expected_max_resource_usage_bound
557+
assert (
558+
min_resource_usage_bound == case.expected_min_resource_usage_bound
559+
and max_resource_usage_bound == case.expected_max_resource_usage_bound
560+
)
561561

562562

563563
def test_start_actor_timeout(ray_start_regular_shared, restore_data_context):

python/ray/data/tests/test_executor_resource_management.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ def test_resource_canonicalization(ray_start_10_cpus_shared):
101101
name="TestMapper",
102102
compute_strategy=TaskPoolStrategy(),
103103
)
104-
min_resource_usage, _ = op.min_max_resource_usage_bounds()
104+
min_resource_usage, _ = op.min_max_resource_requirements()
105105
assert min_resource_usage == ExecutionResources()
106106
data_context = ray.data.DataContext.get_current()
107107
inc_obj_store_mem = (
@@ -123,7 +123,7 @@ def test_resource_canonicalization(ray_start_10_cpus_shared):
123123
compute_strategy=TaskPoolStrategy(),
124124
ray_remote_args={"num_gpus": 2},
125125
)
126-
min_resource_usage, _ = op.min_max_resource_usage_bounds()
126+
min_resource_usage, _ = op.min_max_resource_requirements()
127127
assert min_resource_usage == ExecutionResources()
128128
assert op.incremental_resource_usage() == ExecutionResources(
129129
cpu=0, gpu=2, object_store_memory=inc_obj_store_mem
@@ -138,7 +138,7 @@ def test_resource_canonicalization(ray_start_10_cpus_shared):
138138
compute_strategy=TaskPoolStrategy(),
139139
ray_remote_args={"num_gpus": 2, "num_cpus": 1},
140140
)
141-
min_resource_usage, _ = op.min_max_resource_usage_bounds()
141+
min_resource_usage, _ = op.min_max_resource_requirements()
142142
assert min_resource_usage == ExecutionResources()
143143
assert op.incremental_resource_usage() == ExecutionResources(
144144
cpu=1, gpu=2, object_store_memory=inc_obj_store_mem
@@ -312,7 +312,7 @@ def test_actor_pool_resource_reporting(ray_start_10_cpus_shared, restore_data_co
312312
data_context._max_num_blocks_in_streaming_gen_buffer
313313
* data_context.target_max_block_size
314314
)
315-
min_resource_usage, _ = op.min_max_resource_usage_bounds()
315+
min_resource_usage, _ = op.min_max_resource_requirements()
316316
assert min_resource_usage == ExecutionResources(cpu=2, gpu=0)
317317
# `incremental_resource_usage` should always report 0 CPU and GPU, as
318318
# it doesn't consider scaling-up.
@@ -408,7 +408,7 @@ def test_actor_pool_resource_reporting_with_bundling(ray_start_10_cpus_shared):
408408
data_context._max_num_blocks_in_streaming_gen_buffer
409409
* data_context.target_max_block_size
410410
)
411-
min_resource_usage, _ = op.min_max_resource_usage_bounds()
411+
min_resource_usage, _ = op.min_max_resource_requirements()
412412
assert min_resource_usage == ExecutionResources(cpu=2, gpu=0)
413413
# `incremental_resource_usage` should always report 0 CPU and GPU, as
414414
# it doesn't consider scaling-up.

python/ray/data/tests/test_resource_manager.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,7 @@ def test_does_not_reserve_more_than_max_resource_usage(self):
556556
o1,
557557
DataContext.get_current(),
558558
)
559-
o2.min_max_resource_usage_bounds = MagicMock(
559+
o2.min_max_resource_requirements = MagicMock(
560560
return_value=(
561561
ExecutionResources(cpu=0, object_store_memory=0),
562562
ExecutionResources(cpu=1, object_store_memory=1),

python/ray/data/tests/test_task_pool_map_operator.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
)
1111

1212

13-
def test_min_max_resource_usage_bounds(ray_start_regular_shared, restore_data_context):
13+
def test_min_max_resource_requirements(ray_start_regular_shared, restore_data_context):
1414
data_context = ray.data.DataContext.get_current()
1515
op = TaskPoolMapOperator(
1616
map_transformer=MagicMock(),
@@ -24,7 +24,7 @@ def test_min_max_resource_usage_bounds(ray_start_regular_shared, restore_data_co
2424
(
2525
min_resource_usage_bound,
2626
max_resource_usage_bound,
27-
) = op.min_max_resource_usage_bounds()
27+
) = op.min_max_resource_requirements()
2828

2929
assert (
3030
# At a minimum, you need enough processors to run one task and enough object
@@ -36,7 +36,7 @@ def test_min_max_resource_usage_bounds(ray_start_regular_shared, restore_data_co
3636
)
3737

3838

39-
def test_min_max_resource_usage_bounds_with_inputs_complete(
39+
def test_min_max_resource_requirements_with_inputs_complete(
4040
ray_start_regular_shared, restore_data_context
4141
):
4242
data_context = ray.data.DataContext.get_current()
@@ -54,7 +54,7 @@ def test_min_max_resource_usage_bounds_with_inputs_complete(
5454
(
5555
min_resource_usage_bound,
5656
max_resource_usage_bound,
57-
) = op.min_max_resource_usage_bounds()
57+
) = op.min_max_resource_requirements()
5858

5959
assert min_resource_usage_bound == ExecutionResources(cpu=1, object_store_memory=1)
6060
# If the operator is done receiving inputs, it knows it doesn't need more resources

0 commit comments

Comments
 (0)