Skip to content

Commit 2765db7

Browse files
[Feat][Core/Dashboard] Convert EventHead to subprocess module (#51587)
Signed-off-by: Chi-Sheng Liu <chishengliu@chishengliu.com>
1 parent 07c24e9 commit 2765db7

File tree

5 files changed

+94
-11
lines changed

5 files changed

+94
-11
lines changed

python/ray/dashboard/modules/event/event_head.py

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import aiohttp.web
1212

13+
import ray
1314
import ray.dashboard.optional_utils as dashboard_optional_utils
1415
import ray.dashboard.utils as dashboard_utils
1516
from ray._common.utils import get_or_create_event_loop
@@ -22,10 +23,11 @@
2223
)
2324
from ray.dashboard.modules.event.event_utils import monitor_events, parse_event_strings
2425
from ray.dashboard.state_api_utils import do_filter, handle_list_api
26+
from ray.dashboard.subprocesses.routes import SubprocessRouteTable as routes
27+
from ray.dashboard.subprocesses.module import SubprocessModule
2528
from ray.util.state.common import ClusterEventState, ListApiOptions, ListApiResponse
2629

2730
logger = logging.getLogger(__name__)
28-
routes = dashboard_optional_utils.DashboardHeadRouteTable
2931

3032
JobEvents = OrderedDict
3133
dashboard_utils._json_compatible_types.add(JobEvents)
@@ -79,11 +81,11 @@ def transform(all_events) -> ListApiResponse:
7981

8082

8183
class EventHead(
82-
dashboard_utils.DashboardHeadModule,
84+
SubprocessModule,
8385
dashboard_utils.RateLimitedModule,
8486
):
85-
def __init__(self, config: dashboard_utils.DashboardHeadModuleConfig):
86-
dashboard_utils.DashboardHeadModule.__init__(self, config)
87+
def __init__(self, *args, **kwargs):
88+
SubprocessModule.__init__(self, *args, **kwargs)
8789
dashboard_utils.RateLimitedModule.__init__(
8890
self,
8991
min(
@@ -105,6 +107,10 @@ def __init__(self, config: dashboard_utils.DashboardHeadModuleConfig):
105107
thread_name_prefix="event_head_executor",
106108
)
107109

110+
# To init gcs_client in internal_kv for record_extra_usage_tag.
111+
assert self.gcs_client is not None
112+
assert ray.experimental.internal_kv._internal_kv_initialized()
113+
108114
async def limit_handler_(self):
109115
return dashboard_optional_utils.rest_response(
110116
status_code=dashboard_utils.HTTPStatusCode.INTERNAL_ERROR,
@@ -148,8 +154,15 @@ async def report_events(self, request):
148154
The request body is a JSON array of event strings in type string.
149155
Response should contain {"success": true}.
150156
"""
151-
request_body: List[str] = await request.json()
152-
events = [parse_event_strings(event_str) for event_str in request_body]
157+
try:
158+
request_body: List[str] = await request.json()
159+
except Exception as e:
160+
logger.warning(f"Failed to parse request body: {request=}, {e=}")
161+
raise aiohttp.web.HTTPBadRequest()
162+
if not isinstance(request_body, list):
163+
logger.warning(f"Request body is not a list, {request_body=}")
164+
raise aiohttp.web.HTTPBadRequest()
165+
events = parse_event_strings(request_body)
153166
logger.debug("Received %d events", len(events))
154167
self._update_events(events)
155168
self.total_report_events_count += 1
@@ -208,13 +221,10 @@ async def list_api_fn(option: ListApiOptions):
208221

209222
return await handle_list_api(list_api_fn, req)
210223

211-
async def run(self, server):
224+
async def run(self):
225+
await super().run()
212226
self._monitor = monitor_events(
213227
self._event_dir,
214228
lambda data: self._update_events(parse_event_strings(data)),
215229
self._executor,
216230
)
217-
218-
@staticmethod
219-
def is_minimal_module():
220-
return False

python/ray/dashboard/modules/event/tests/test_event.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,31 @@ def _check_events():
231231
wait_for_condition(_check_events, timeout=15)
232232

233233

234+
def test_report_events(ray_start_with_dashboard):
235+
assert wait_until_server_available(ray_start_with_dashboard["webui_url"])
236+
webui_url = format_web_url(ray_start_with_dashboard["webui_url"])
237+
url = f"{webui_url}/report_events"
238+
239+
resp = requests.post(url)
240+
assert resp.status_code == 400
241+
resp = requests.post(url, json={"Hello": "World"})
242+
assert resp.status_code == 400
243+
244+
job_id = ray.JobID.from_int(100).hex()
245+
sample_event = _get_event("Hello", job_id=job_id)
246+
resp = requests.post(url, json=[json.dumps(sample_event)])
247+
assert resp.status_code == 200
248+
249+
resp = requests.get(f"{webui_url}/events")
250+
assert resp.status_code == 200
251+
result = resp.json()
252+
all_events = result["data"]["events"]
253+
assert len(all_events) == 1
254+
assert job_id in all_events
255+
assert len(all_events[job_id]) == 1
256+
assert all_events[job_id][0]["message"] == "Hello"
257+
258+
234259
@pytest.mark.asyncio
235260
async def test_monitor_events():
236261
with tempfile.TemporaryDirectory() as temp_dir:

python/ray/dashboard/subprocesses/module.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,10 @@ def session_name(self):
163163
"""
164164
return self._config.session_name
165165

166+
@property
167+
def log_dir(self):
168+
return self._config.log_dir
169+
166170
@property
167171
def http_session(self):
168172
if self._http_session is None:

python/ray/dashboard/subprocesses/tests/test_e2e.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,32 @@ async def test_load_multiple_modules(aiohttp_client, default_module_config):
115115
assert await response.text() == "Hello from TestModule1"
116116

117117

118+
async def test_cached_endpoint(aiohttp_client, default_module_config):
119+
"""
120+
Test whether the ray.dashboard.optional_utils.aiohttp_cache decorator works.
121+
"""
122+
app = await start_http_server_app(default_module_config, [TestModule])
123+
client = await aiohttp_client(app)
124+
125+
response = await client.get("/not_cached")
126+
assert response.status == 200
127+
assert await response.text() == "Hello, World from GET /not_cached, count: 1"
128+
129+
# Call again, count should increase.
130+
response = await client.get("/not_cached")
131+
assert response.status == 200
132+
assert await response.text() == "Hello, World from GET /not_cached, count: 2"
133+
134+
response = await client.get("/cached")
135+
assert response.status == 200
136+
assert await response.text() == "Hello, World from GET /cached, count: 1"
137+
138+
# Call again, count should NOT increase.
139+
response = await client.get("/cached")
140+
assert response.status == 200
141+
assert await response.text() == "Hello, World from GET /cached, count: 1"
142+
143+
118144
async def test_streamed_iota(aiohttp_client, default_module_config):
119145
# TODO(ryw): also test streams that raise exceptions.
120146
app = await start_http_server_app(default_module_config, [TestModule])

python/ray/dashboard/subprocesses/tests/utils.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import signal
66
from typing import AsyncIterator
77

8+
from ray.dashboard import optional_utils
89
from ray.dashboard.optional_deps import aiohttp
910

1011
from ray.dashboard.subprocesses.module import SubprocessModule
@@ -28,6 +29,8 @@ class TestModule(BaseTestModule):
2829
def __init__(self, *args, **kwargs):
2930
super().__init__(*args, **kwargs)
3031
self.run_finished = False
32+
self.not_cached_count = 0
33+
self.cached_count = 0
3134

3235
async def run(self):
3336
await super().run()
@@ -49,6 +52,21 @@ async def echo(self, req: aiohttp.web.Request) -> aiohttp.web.Response:
4952
body = await req.text()
5053
return aiohttp.web.Response(text="Hello, World from POST /echo from " + body)
5154

55+
@routes.get("/not_cached")
56+
async def not_cached(self, req: aiohttp.web.Request) -> aiohttp.web.Response:
57+
self.not_cached_count += 1
58+
return aiohttp.web.Response(
59+
text=f"Hello, World from GET /not_cached, count: {self.not_cached_count}"
60+
)
61+
62+
@routes.get("/cached")
63+
@optional_utils.aiohttp_cache
64+
async def cached(self, req: aiohttp.web.Request) -> aiohttp.web.Response:
65+
self.cached_count += 1
66+
return aiohttp.web.Response(
67+
text=f"Hello, World from GET /cached, count: {self.cached_count}"
68+
)
69+
5270
@routes.put("/error")
5371
async def make_error(self, req: aiohttp.web.Request) -> aiohttp.web.Response:
5472
raise ValueError("This is an error")

0 commit comments

Comments
 (0)