Skip to content

Commit 34ae461

Browse files
[Feat][Core/Dashboard] Convert StateHead to subprocess module (#51676)
Signed-off-by: Chi-Sheng Liu <chishengliu@chishengliu.com>
1 parent fb830d1 commit 34ae461

File tree

4 files changed

+29
-19
lines changed

4 files changed

+29
-19
lines changed

python/ray/dashboard/modules/state/state_head.py

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@
88
import aiohttp.web
99
from aiohttp.web import Response
1010

11-
import ray.dashboard.optional_utils as dashboard_optional_utils
12-
import ray.dashboard.utils as dashboard_utils
11+
import ray
1312
from ray import ActorID
1413
from ray._private.ray_constants import env_integer
1514
from ray._private.usage.usage_lib import TagKey, record_extra_usage_tag
@@ -28,12 +27,14 @@
2827
options_from_req,
2928
)
3029
from ray.dashboard.utils import RateLimitedModule
30+
from ray.dashboard.subprocesses.routes import SubprocessRouteTable as routes
31+
from ray.dashboard.subprocesses.module import SubprocessModule
32+
from ray.dashboard.subprocesses.utils import ResponseType
3133
from ray.util.state.common import DEFAULT_LOG_LIMIT, DEFAULT_RPC_TIMEOUT, GetLogOptions
3234
from ray.util.state.exception import DataSourceUnavailable
3335
from ray.util.state.state_manager import StateDataSourceClient
3436

3537
logger = logging.getLogger(__name__)
36-
routes = dashboard_optional_utils.DashboardHeadRouteTable
3738

3839
# NOTE: Executor in this head is intentionally constrained to just 1 thread by
3940
# default to limit its concurrency, therefore reducing potential for
@@ -43,19 +44,16 @@
4344
)
4445

4546

46-
class StateHead(dashboard_utils.DashboardHeadModule, RateLimitedModule):
47+
class StateHead(SubprocessModule, RateLimitedModule):
4748
"""Module to obtain state information from the Ray cluster.
4849
4950
It is responsible for state observability APIs such as
5051
ray.list_actors(), ray.get_actor(), ray.summary_actors().
5152
"""
5253

53-
def __init__(
54-
self,
55-
config: dashboard_utils.DashboardHeadModuleConfig,
56-
):
54+
def __init__(self, *args, **kwargs):
5755
"""Initialize for handling RESTful requests from State API Client"""
58-
dashboard_utils.DashboardHeadModule.__init__(self, config)
56+
SubprocessModule.__init__(self, *args, **kwargs)
5957
# We don't allow users to configure too high a rate limit
6058
RateLimitedModule.__init__(
6159
self,
@@ -73,6 +71,10 @@ def __init__(
7371
thread_name_prefix="state_head_executor",
7472
)
7573

74+
# To make sure that the internal KV is initialized by getting the lazy property
75+
assert self.gcs_client is not None
76+
assert ray.experimental.internal_kv._internal_kv_initialized()
77+
7678
async def limit_handler_(self):
7779
return do_reply(
7880
success=False,
@@ -191,7 +193,7 @@ async def list_logs(self, req: aiohttp.web.Request) -> aiohttp.web.Response:
191193

192194
return do_reply(success=True, error_message="", result=result)
193195

194-
@routes.get("/api/v0/logs/{media_type}")
196+
@routes.get("/api/v0/logs/{media_type}", resp_type=ResponseType.STREAM)
195197
@RateLimitedModule.enforce_max_concurrent_calls
196198
async def get_logs(self, req: aiohttp.web.Request):
197199
"""
@@ -333,7 +335,8 @@ async def delayed_response(self, req: aiohttp.web.Request):
333335
partial_failure_warning=None,
334336
)
335337

336-
async def run(self, server):
338+
async def run(self):
339+
await SubprocessModule.run(self)
337340
gcs_channel = self.aiogrpc_gcs_channel
338341
self._state_api_data_source_client = StateDataSourceClient(
339342
gcs_channel, self.gcs_aio_client
@@ -343,7 +346,3 @@ async def run(self, server):
343346
self._executor,
344347
)
345348
self._log_api = LogsManager(self._state_api_data_source_client)
346-
347-
@staticmethod
348-
def is_minimal_module():
349-
return False

python/ray/dashboard/subprocesses/module.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import ray
1313
from ray import ray_constants
1414
from ray._raylet import GcsClient
15-
from ray._private.gcs_utils import GcsAioClient
15+
from ray._private.gcs_utils import GcsAioClient, GcsChannel
1616
from ray._private.ray_logging import configure_log_file
1717
from ray._private.utils import open_log
1818
from ray.dashboard.subprocesses.utils import (
@@ -68,6 +68,7 @@ def __init__(
6868
# Lazy init
6969
self._gcs_client = None
7070
self._gcs_aio_client = None
71+
self._aiogrpc_gcs_channel = None
7172
self._parent_process_death_detection_task = None
7273
self._http_session = None
7374

@@ -159,6 +160,14 @@ def gcs_client(self):
159160
self._gcs_client = ray.experimental.internal_kv.internal_kv_get_gcs_client()
160161
return self._gcs_client
161162

163+
@property
164+
def aiogrpc_gcs_channel(self):
165+
if self._aiogrpc_gcs_channel is None:
166+
gcs_channel = GcsChannel(gcs_address=self._config.gcs_address, aio=True)
167+
gcs_channel.connect()
168+
self._aiogrpc_gcs_channel = gcs_channel.channel()
169+
return self._aiogrpc_gcs_channel
170+
162171
@property
163172
def session_name(self):
164173
"""

python/ray/dashboard/subprocesses/tests/utils.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ def gcs_aio_client(self):
2424
def gcs_client(self):
2525
return None
2626

27+
@property
28+
def aiogrpc_gcs_channel(self):
29+
return None
30+
2731

2832
class TestModule(BaseTestModule):
2933
def __init__(self, *args, **kwargs):

python/ray/tests/test_state_api_log.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -999,9 +999,7 @@ def verify():
999999
with pytest.raises(requests.HTTPError) as e:
10001000
list_logs(node_id=node_id)
10011001

1002-
assert (
1003-
f"Agent for node id: {node_id} doesn't exist." in e.value.response.json()["msg"]
1004-
)
1002+
assert e.value.response.status_code == 500
10051003

10061004

10071005
@pytest.mark.skipif(

0 commit comments

Comments
 (0)