Skip to content

Commit 6165021

Browse files
authored
[Data] Adding in metrics for number of actors alive, pending and restarting (#51082)
## Why are these changes needed? Currently we have information about actors that is only displayed in the progress bar (see [here](https://github.com/ray-project/ray/blob/678a8d75b47730d6685341a61bc8d14e924a61f8/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py#L783)), this adds corresponding metrics so they can be included in prometheus / grafana. --------- Signed-off-by: Matthew Owen <mowen@anyscale.com>
1 parent ae7340d commit 6165021

File tree

5 files changed

+156
-101
lines changed

5 files changed

+156
-101
lines changed

python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ class MetricsGroup(Enum):
3535
TASKS = "tasks"
3636
OBJECT_STORE_MEMORY = "object_store_memory"
3737
MISC = "misc"
38+
ACTORS = "actors"
3839

3940

4041
@dataclass(frozen=True)
@@ -324,6 +325,23 @@ class OpRuntimeMetrics(metaclass=OpRuntimesMetricsMeta):
324325
metrics_group=MetricsGroup.TASKS,
325326
)
326327

328+
# === Actor-related metrics ===
329+
num_alive_actors: int = metric_field(
330+
default=0,
331+
description="Number of alive actors.",
332+
metrics_group=MetricsGroup.ACTORS,
333+
)
334+
num_restarting_actors: int = metric_field(
335+
default=0,
336+
description="Number of restarting actors.",
337+
metrics_group=MetricsGroup.ACTORS,
338+
)
339+
num_pending_actors: int = metric_field(
340+
default=0,
341+
description="Number of pending actors.",
342+
metrics_group=MetricsGroup.ACTORS,
343+
)
344+
327345
# === Object store memory metrics ===
328346
obj_store_mem_internal_inqueue_blocks: int = metric_field(
329347
default=0,

python/ray/data/_internal/execution/interfaces/physical_operator.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from abc import ABC, abstractmethod
2-
from typing import Any, Callable, Dict, Iterator, List, Optional, Union
2+
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union
33
import uuid
44

55
import ray
@@ -569,3 +569,12 @@ def actor_info_progress_str(self) -> str:
569569
Actors.
570570
"""
571571
return ""
572+
573+
def actor_info_counts(self) -> Tuple[int, int, int]:
574+
"""Returns Actor counts for Alive, Restarting and Pending Actors.
575+
576+
This method will be called in add_output API in OpState. Subclasses can
577+
override it to return counts for Alive, Restarting and Pending
578+
Actors.
579+
"""
580+
return 0, 0, 0

python/ray/data/_internal/execution/operators/actor_pool_map_operator.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,10 @@ def actor_info_progress_str(self) -> str:
386386
"""Returns Actor progress strings for Alive, Restarting and Pending Actors."""
387387
return self._actor_pool.actor_info_progress_str()
388388

389+
def actor_info_counts(self) -> Tuple[int, int, int]:
390+
"""Returns Actor counts for Alive, Restarting and Pending Actors."""
391+
return self._actor_pool.actor_info_counts()
392+
389393

390394
class _MapWorker:
391395
"""An actor worker for MapOperator."""
@@ -780,11 +784,16 @@ def _get_location(self, bundle: RefBundle) -> Optional[NodeIdStr]:
780784
"""
781785
return bundle.get_cached_location()
782786

783-
def actor_info_progress_str(self) -> str:
784-
"""Returns Actor progress strings for Alive, Restarting and Pending Actors."""
787+
def actor_info_counts(self) -> Tuple[int, int, int]:
788+
"""Returns Actor counts for Alive, Restarting and Pending Actors."""
785789
alive = self.num_alive_actors()
786790
pending = self.num_pending_actors()
787791
restarting = self.num_restarting_actors()
792+
return alive, pending, restarting
793+
794+
def actor_info_progress_str(self) -> str:
795+
"""Returns Actor progress strings for Alive, Restarting and Pending Actors."""
796+
alive, pending, restarting = self.actor_info_counts()
788797
total = alive + pending + restarting
789798
if total == alive:
790799
return f"; Actors: {total}"

python/ray/data/_internal/execution/streaming_executor_state.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,10 @@ def add_output(self, ref: RefBundle) -> None:
253253
ref.num_rows() is not None
254254
), "RefBundle must have a valid number of rows"
255255
self.progress_bar.update(ref.num_rows(), self.op.num_output_rows_total())
256+
active, restarting, pending = self.op.actor_info_counts()
257+
self.op.metrics.num_alive_actors = active
258+
self.op.metrics.num_restarting_actors = restarting
259+
self.op.metrics.num_pending_actors = pending
256260

257261
def refresh_progress_bar(self, resource_manager: ResourceManager) -> None:
258262
"""Update the console with the latest operator progress."""

python/ray/data/tests/test_stats.py

Lines changed: 113 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ def gen_expected_metrics(
7474
"'task_submission_backpressure_time': "
7575
f"{'N' if task_backpressure else 'Z'}"
7676
),
77+
"'num_alive_actors': Z",
78+
"'num_restarting_actors': Z",
79+
"'num_pending_actors': Z",
7780
"'obj_store_mem_internal_inqueue_blocks': Z",
7881
"'obj_store_mem_internal_outqueue_blocks': Z",
7982
"'obj_store_mem_freed': N",
@@ -94,6 +97,9 @@ def gen_expected_metrics(
9497
"'task_submission_backpressure_time': "
9598
f"{'N' if task_backpressure else 'Z'}"
9699
),
100+
"'num_alive_actors': Z",
101+
"'num_restarting_actors': Z",
102+
"'num_pending_actors': Z",
97103
"'obj_store_mem_internal_inqueue_blocks': Z",
98104
"'obj_store_mem_internal_outqueue_blocks': Z",
99105
"'obj_store_mem_used': A",
@@ -566,6 +572,9 @@ def test_dataset__repr__(ray_start_regular_shared, restore_data_context):
566572
" num_tasks_failed: Z,\n"
567573
" block_generation_time: N,\n"
568574
" task_submission_backpressure_time: N,\n"
575+
" num_alive_actors: Z,\n"
576+
" num_restarting_actors: Z,\n"
577+
" num_pending_actors: Z,\n"
569578
" obj_store_mem_internal_inqueue_blocks: Z,\n"
570579
" obj_store_mem_internal_outqueue_blocks: Z,\n"
571580
" obj_store_mem_freed: N,\n"
@@ -681,6 +690,9 @@ def check_stats():
681690
" num_tasks_failed: Z,\n"
682691
" block_generation_time: N,\n"
683692
" task_submission_backpressure_time: N,\n"
693+
" num_alive_actors: Z,\n"
694+
" num_restarting_actors: Z,\n"
695+
" num_pending_actors: Z,\n"
684696
" obj_store_mem_internal_inqueue_blocks: Z,\n"
685697
" obj_store_mem_internal_outqueue_blocks: Z,\n"
686698
" obj_store_mem_freed: N,\n"
@@ -751,6 +763,9 @@ def check_stats():
751763
" num_tasks_failed: Z,\n"
752764
" block_generation_time: N,\n"
753765
" task_submission_backpressure_time: N,\n"
766+
" num_alive_actors: Z,\n"
767+
" num_restarting_actors: Z,\n"
768+
" num_pending_actors: Z,\n"
754769
" obj_store_mem_internal_inqueue_blocks: Z,\n"
755770
" obj_store_mem_internal_outqueue_blocks: Z,\n"
756771
" obj_store_mem_freed: N,\n"
@@ -1370,6 +1385,104 @@ def time_to_seconds(time_str):
13701385
assert isclose(percent, time_s / total_time * 100, rel_tol=0.01)
13711386

13721387

1388+
def test_per_node_metrics_basic(ray_start_regular_shared, restore_data_context):
1389+
"""Basic test to ensure per-node metrics are populated."""
1390+
ctx = DataContext.get_current()
1391+
ctx.enable_per_node_metrics = True
1392+
1393+
def _sum_net_metrics(per_node_metrics: Dict[str, NodeMetrics]) -> Dict[str, float]:
1394+
sum_metrics = defaultdict(float)
1395+
for metrics in per_node_metrics.values():
1396+
for metric, value in metrics.items():
1397+
sum_metrics[metric] += value
1398+
return sum_metrics
1399+
1400+
with patch("ray.data._internal.stats.StatsManager._stats_actor") as mock_get_actor:
1401+
mock_actor_handle = MagicMock()
1402+
mock_get_actor.return_value = mock_actor_handle
1403+
1404+
ds = ray.data.range(20).map_batches(lambda batch: batch).materialize()
1405+
metrics = ds._plan.stats().extra_metrics
1406+
1407+
calls = mock_actor_handle.update_execution_metrics.remote.call_args_list
1408+
assert len(calls) > 0
1409+
1410+
last_args, _ = calls[-1]
1411+
per_node_metrics = last_args[-1]
1412+
1413+
assert isinstance(per_node_metrics, dict)
1414+
assert len(per_node_metrics) >= 1
1415+
1416+
for nm in per_node_metrics.values():
1417+
for f in fields(NodeMetrics):
1418+
assert f.name in nm
1419+
1420+
# basic checks to make sure metrics are populated
1421+
assert any(nm["num_tasks_finished"] > 0 for nm in per_node_metrics.values())
1422+
assert any(
1423+
nm["bytes_outputs_of_finished_tasks"] > 0
1424+
for nm in per_node_metrics.values()
1425+
)
1426+
assert any(
1427+
nm["blocks_outputs_of_finished_tasks"] > 0
1428+
for nm in per_node_metrics.values()
1429+
)
1430+
1431+
net_metrics = _sum_net_metrics(per_node_metrics)
1432+
assert net_metrics["num_tasks_finished"] == metrics["num_tasks_finished"]
1433+
assert (
1434+
net_metrics["bytes_outputs_of_finished_tasks"]
1435+
== metrics["bytes_outputs_of_finished_tasks"]
1436+
)
1437+
1438+
1439+
@pytest.mark.parametrize("enable_metrics", [True, False])
1440+
def test_per_node_metrics_toggle(
1441+
ray_start_regular_shared, restore_data_context, enable_metrics
1442+
):
1443+
ctx = DataContext.get_current()
1444+
ctx.enable_per_node_metrics = enable_metrics
1445+
1446+
with patch("ray.data._internal.stats.StatsManager._stats_actor") as mock_get_actor:
1447+
mock_actor_handle = MagicMock()
1448+
mock_get_actor.return_value = mock_actor_handle
1449+
1450+
ray.data.range(10000).map(lambda x: x).materialize()
1451+
1452+
calls = mock_actor_handle.update_execution_metrics.remote.call_args_list
1453+
assert len(calls) > 0
1454+
1455+
last_args, _ = calls[-1]
1456+
per_node_metrics = last_args[-1]
1457+
1458+
if enable_metrics:
1459+
assert per_node_metrics is not None
1460+
else:
1461+
assert per_node_metrics is None
1462+
1463+
1464+
def test_task_duration_stats():
1465+
"""Test that OpTaskDurationStats correctly tracks running statistics using Welford's algorithm."""
1466+
stats = TaskDurationStats()
1467+
1468+
# Test initial state
1469+
assert stats.count() == 0
1470+
assert stats.mean() == 0.0
1471+
assert stats.stddev() == 0.0
1472+
1473+
# Add some task durations and verify stats
1474+
durations = [2.0, 4.0, 4.0, 4.0, 5.0, 5.0, 7.0, 9.0]
1475+
for d in durations:
1476+
stats.add_duration(d)
1477+
1478+
# Compare with numpy's implementations
1479+
assert stats.count() == len(durations)
1480+
assert pytest.approx(stats.mean()) == np.mean(durations)
1481+
assert pytest.approx(stats.stddev()) == np.std(
1482+
durations, ddof=1
1483+
) # ddof=1 for sample standard deviation
1484+
1485+
13731486
# NOTE: All tests above share a Ray cluster, while the tests below do not. These
13741487
# tests should only be carefully reordered to retain this invariant!
13751488

@@ -1709,104 +1822,6 @@ def update_stats_manager(i):
17091822
wait_for_condition(lambda: not StatsManager._update_thread.is_alive())
17101823

17111824

1712-
def test_per_node_metrics_basic(ray_start_regular_shared, restore_data_context):
1713-
"""Basic test to ensure per-node metrics are populated."""
1714-
ctx = DataContext.get_current()
1715-
ctx.enable_per_node_metrics = True
1716-
1717-
def _sum_net_metrics(per_node_metrics: Dict[str, NodeMetrics]) -> Dict[str, float]:
1718-
sum_metrics = defaultdict(float)
1719-
for metrics in per_node_metrics.values():
1720-
for metric, value in metrics.items():
1721-
sum_metrics[metric] += value
1722-
return sum_metrics
1723-
1724-
with patch("ray.data._internal.stats.StatsManager._stats_actor") as mock_get_actor:
1725-
mock_actor_handle = MagicMock()
1726-
mock_get_actor.return_value = mock_actor_handle
1727-
1728-
ds = ray.data.range(20).map_batches(lambda batch: batch).materialize()
1729-
metrics = ds._plan.stats().extra_metrics
1730-
1731-
calls = mock_actor_handle.update_execution_metrics.remote.call_args_list
1732-
assert len(calls) > 0
1733-
1734-
last_args, _ = calls[-1]
1735-
per_node_metrics = last_args[-1]
1736-
1737-
assert isinstance(per_node_metrics, dict)
1738-
assert len(per_node_metrics) >= 1
1739-
1740-
for nm in per_node_metrics.values():
1741-
for f in fields(NodeMetrics):
1742-
assert f.name in nm
1743-
1744-
# basic checks to make sure metrics are populated
1745-
assert any(nm["num_tasks_finished"] > 0 for nm in per_node_metrics.values())
1746-
assert any(
1747-
nm["bytes_outputs_of_finished_tasks"] > 0
1748-
for nm in per_node_metrics.values()
1749-
)
1750-
assert any(
1751-
nm["blocks_outputs_of_finished_tasks"] > 0
1752-
for nm in per_node_metrics.values()
1753-
)
1754-
1755-
net_metrics = _sum_net_metrics(per_node_metrics)
1756-
assert net_metrics["num_tasks_finished"] == metrics["num_tasks_finished"]
1757-
assert (
1758-
net_metrics["bytes_outputs_of_finished_tasks"]
1759-
== metrics["bytes_outputs_of_finished_tasks"]
1760-
)
1761-
1762-
1763-
@pytest.mark.parametrize("enable_metrics", [True, False])
1764-
def test_per_node_metrics_toggle(
1765-
ray_start_regular_shared, restore_data_context, enable_metrics
1766-
):
1767-
ctx = DataContext.get_current()
1768-
ctx.enable_per_node_metrics = enable_metrics
1769-
1770-
with patch("ray.data._internal.stats.StatsManager._stats_actor") as mock_get_actor:
1771-
mock_actor_handle = MagicMock()
1772-
mock_get_actor.return_value = mock_actor_handle
1773-
1774-
ray.data.range(10000).map(lambda x: x).materialize()
1775-
1776-
calls = mock_actor_handle.update_execution_metrics.remote.call_args_list
1777-
assert len(calls) > 0
1778-
1779-
last_args, _ = calls[-1]
1780-
per_node_metrics = last_args[-1]
1781-
1782-
if enable_metrics:
1783-
assert per_node_metrics is not None
1784-
else:
1785-
assert per_node_metrics is None
1786-
1787-
1788-
def test_task_duration_stats():
1789-
"""Test that OpTaskDurationStats correctly tracks running statistics using Welford's algorithm."""
1790-
stats = TaskDurationStats()
1791-
1792-
# Test initial state
1793-
assert stats.count() == 0
1794-
assert stats.mean() == 0.0
1795-
assert stats.stddev() == 0.0
1796-
1797-
# Add some task durations and verify stats
1798-
durations = [2.0, 4.0, 4.0, 4.0, 5.0, 5.0, 7.0, 9.0]
1799-
for d in durations:
1800-
stats.add_duration(d)
1801-
1802-
# Compare with numpy's implementations
1803-
assert stats.count() == len(durations)
1804-
assert pytest.approx(stats.mean()) == np.mean(durations)
1805-
assert pytest.approx(stats.stddev()) == np.std(
1806-
durations, ddof=1
1807-
) # ddof=1 for sample standard deviation
1808-
1809-
18101825
if __name__ == "__main__":
18111826
import sys
18121827

0 commit comments

Comments
 (0)