@@ -250,7 +250,12 @@ def get_op_usage_str(self, op: PhysicalOperator) -> str:
250
250
budget = self ._op_resource_allocator ._op_budgets [op ]
251
251
usage_str += f", budget=(cpu={ budget .cpu :.1f} "
252
252
usage_str += f",gpu={ budget .gpu :.1f} "
253
- usage_str += f",object store={ budget .object_store_memory_str ()} )"
253
+ usage_str += f",obj_store={ budget .object_store_memory_str ()} "
254
+ # Remaining memory budget for producing new task outputs.
255
+ reserved_for_output = memory_string (
256
+ self ._op_resource_allocator ._output_budgets .get (op , 0 )
257
+ )
258
+ usage_str += f",out={ reserved_for_output } )"
254
259
return usage_str
255
260
256
261
def op_resource_allocator_enabled (self ) -> bool :
@@ -405,6 +410,8 @@ def __init__(self, resource_manager: ResourceManager, reservation_ratio: float):
405
410
self ._total_shared = ExecutionResources .zero ()
406
411
# Resource budgets for each operator, excluding `_reserved_for_op_outputs`.
407
412
self ._op_budgets : Dict [PhysicalOperator , ExecutionResources ] = {}
413
+ # Remaining memory budget for generating new task outputs, per operator.
414
+ self ._output_budgets : Dict [PhysicalOperator , float ] = {}
408
415
# Whether each operator has reserved the minimum resources to run
409
416
# at least one task.
410
417
# This is used to avoid edge cases where the entire resource limits are not
@@ -553,12 +560,14 @@ def max_task_output_bytes_to_read(self, op: PhysicalOperator) -> Optional[int]:
553
560
op_outputs_usage = self ._get_op_outputs_usage_with_downstream (op )
554
561
res += max (self ._reserved_for_op_outputs [op ] - op_outputs_usage , 0 )
555
562
if math .isinf (res ):
563
+ self ._output_budgets [op ] = res
556
564
return None
557
565
558
566
res = int (res )
559
567
assert res >= 0
560
568
if res == 0 and self ._should_unblock_streaming_output_backpressure (op ):
561
569
res = 1
570
+ self ._output_budgets [op ] = res
562
571
return res
563
572
564
573
def _get_downstream_ineligible_ops (
0 commit comments