Skip to content

Commit ae7340d

Browse files
[Feat][Core/Dashboard] Add SubprocessModules to the Dashboard routes, and convert HealthzHead (#51282)
Signed-off-by: Chi-Sheng Liu <chishengliu@chishengliu.com>
1 parent 8c1f77a commit ae7340d

File tree

14 files changed

+350
-98
lines changed

14 files changed

+350
-98
lines changed

python/ray/_private/node.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,12 @@
2626
from ray._raylet import GcsClient, get_session_key_from_storage
2727
from ray._private.resource_spec import ResourceSpec
2828
from ray._private.services import serialize_config, get_address
29-
from ray._private.utils import open_log, try_to_create_directory, try_to_symlink
29+
from ray._private.utils import (
30+
open_log,
31+
try_to_create_directory,
32+
try_to_symlink,
33+
validate_socket_filepath,
34+
)
3035

3136
# Logger for this module. It should be configured at the entry point
3237
# into the program using Ray. Ray configures it by default automatically
@@ -989,7 +994,6 @@ def _prepare_socket_file(self, socket_path: str, default_prefix: str):
989994
socket_path: the socket file to prepare.
990995
"""
991996
result = socket_path
992-
is_mac = sys.platform.startswith("darwin")
993997
if sys.platform == "win32":
994998
if socket_path is None:
995999
result = f"tcp://{self._localhost}:{self._get_unused_port()}"
@@ -1001,12 +1005,7 @@ def _prepare_socket_file(self, socket_path: str, default_prefix: str):
10011005
else:
10021006
try_to_create_directory(os.path.dirname(socket_path))
10031007

1004-
# Check socket path length to make sure it's short enough
1005-
maxlen = (104 if is_mac else 108) - 1 # sockaddr_un->sun_path
1006-
if len(result.split("://", 1)[-1].encode("utf-8")) > maxlen:
1007-
raise OSError(
1008-
f"AF_UNIX path length cannot exceed {maxlen} bytes: {result!r}"
1009-
)
1008+
validate_socket_filepath(result.split("://", 1)[-1])
10101009
return result
10111010

10121011
def _get_cached_port(

python/ray/_private/usage/usage_lib.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,7 @@ def get_total_num_running_jobs_to_report(gcs_client) -> Optional[int]:
548548
total_num_running_jobs += 1
549549
return total_num_running_jobs
550550
except Exception as e:
551-
logger.info(f"Faile to query number of running jobs in the cluster: {e}")
551+
logger.info(f"Failed to query number of running jobs in the cluster: {e}")
552552
return None
553553

554554

@@ -562,7 +562,7 @@ def get_total_num_nodes_to_report(gcs_client, timeout=None) -> Optional[int]:
562562
total_num_nodes += 1
563563
return total_num_nodes
564564
except Exception as e:
565-
logger.info(f"Faile to query number of nodes in the cluster: {e}")
565+
logger.info(f"Failed to query number of nodes in the cluster: {e}")
566566
return None
567567

568568

@@ -593,24 +593,24 @@ def get_extra_usage_tags_to_report(gcs_client) -> Dict[str, str]:
593593
k, v = kv.split("=")
594594
extra_usage_tags[k] = v
595595
except Exception as e:
596-
logger.info(f"Failed to parse extra usage tags env var. Error: {e}")
596+
logger.info(f"Failed to parse extra usage tags env var: {e}")
597597

598598
valid_tag_keys = [tag_key.lower() for tag_key in TagKey.keys()]
599599
try:
600600
keys = gcs_client.internal_kv_keys(
601601
usage_constant.EXTRA_USAGE_TAG_PREFIX.encode(),
602602
namespace=usage_constant.USAGE_STATS_NAMESPACE.encode(),
603603
)
604-
for key in keys:
605-
value = gcs_client.internal_kv_get(
606-
key, namespace=usage_constant.USAGE_STATS_NAMESPACE.encode()
607-
)
604+
kv = gcs_client.internal_kv_multi_get(
605+
keys, namespace=usage_constant.USAGE_STATS_NAMESPACE.encode()
606+
)
607+
for key, value in kv.items():
608608
key = key.decode("utf-8")
609609
key = key[len(usage_constant.EXTRA_USAGE_TAG_PREFIX) :]
610610
assert key in valid_tag_keys
611611
extra_usage_tags[key] = value.decode("utf-8")
612612
except Exception as e:
613-
logger.info(f"Failed to get extra usage tags from kv store {e}")
613+
logger.info(f"Failed to get extra usage tags from kv store: {e}")
614614
return extra_usage_tags
615615

616616

python/ray/_private/utils.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1925,3 +1925,18 @@ def get_current_node_cpu_model_name() -> Optional[str]:
19251925
except Exception:
19261926
logger.debug("Failed to get CPU model name", exc_info=True)
19271927
return None
1928+
1929+
1930+
def validate_socket_filepath(filepath: str):
1931+
"""
1932+
Validate the provided filename is a valid Unix socket filename.
1933+
"""
1934+
# Don't check for Windows as it doesn't support Unix sockets.
1935+
if sys.platform == "win32":
1936+
return
1937+
is_mac = sys.platform.startswith("darwin")
1938+
maxlen = (104 if is_mac else 108) - 1
1939+
if len(filepath.encode("utf-8")) > maxlen:
1940+
raise OSError(
1941+
f"validate_socket_filename failed: AF_UNIX path length cannot exceed {maxlen} bytes: {filepath}"
1942+
)

python/ray/dashboard/dashboard.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@ class Dashboard:
4444
serve_frontend: If configured, frontend HTML
4545
is not served from the dashboard.
4646
log_dir: Log directory of dashboard.
47+
logging_level: The logging level (e.g. logging.INFO, logging.DEBUG)
48+
logging_format: The format string for log messages
49+
logging_filename: The name of the log file
50+
logging_rotate_bytes: Max size in bytes before rotating log file
51+
logging_rotate_backup_count: Number of backup files to keep when rotating
4752
"""
4853

4954
def __init__(
@@ -55,7 +60,12 @@ def __init__(
5560
cluster_id_hex: str,
5661
grpc_port: int,
5762
node_ip_address: str,
58-
log_dir: str = None,
63+
log_dir: str,
64+
logging_level: int,
65+
logging_format: str,
66+
logging_filename: str,
67+
logging_rotate_bytes: int,
68+
logging_rotate_backup_count: int,
5969
temp_dir: str = None,
6070
session_dir: str = None,
6171
minimal: bool = False,
@@ -71,6 +81,11 @@ def __init__(
7181
node_ip_address=node_ip_address,
7282
grpc_port=grpc_port,
7383
log_dir=log_dir,
84+
logging_level=logging_level,
85+
logging_format=logging_format,
86+
logging_filename=logging_filename,
87+
logging_rotate_bytes=logging_rotate_bytes,
88+
logging_rotate_backup_count=logging_rotate_backup_count,
7489
temp_dir=temp_dir,
7590
session_dir=session_dir,
7691
minimal=minimal,
@@ -236,6 +251,11 @@ async def run(self):
236251
grpc_port=args.grpc_port,
237252
node_ip_address=args.node_ip_address,
238253
log_dir=args.log_dir,
254+
logging_level=args.logging_level,
255+
logging_format=args.logging_format,
256+
logging_filename=args.logging_filename,
257+
logging_rotate_bytes=args.logging_rotate_bytes,
258+
logging_rotate_backup_count=args.logging_rotate_backup_count,
239259
temp_dir=args.temp_dir,
240260
session_dir=args.session_dir,
241261
minimal=args.minimal,

python/ray/dashboard/head.py

Lines changed: 134 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22
import logging
33
from concurrent.futures import ThreadPoolExecutor
44
from pathlib import Path
5-
from typing import Optional, Set
5+
from typing import Optional, Set, List, Tuple, TYPE_CHECKING
66

7+
import ray
78
import ray.dashboard.consts as dashboard_consts
89
import ray.dashboard.utils as dashboard_utils
910
import ray.experimental.internal_kv as internal_kv
@@ -26,6 +27,8 @@
2627
except ImportError:
2728
prometheus_client = None
2829

30+
if TYPE_CHECKING:
31+
from ray.dashboard.subprocesses.handle import SubprocessModuleHandle
2932

3033
logger = logging.getLogger(__name__)
3134

@@ -70,6 +73,11 @@ def __init__(
7073
node_ip_address: str,
7174
grpc_port: int,
7275
log_dir: str,
76+
logging_level: int,
77+
logging_format: str,
78+
logging_filename: str,
79+
logging_rotate_bytes: int,
80+
logging_rotate_backup_count: int,
7381
temp_dir: str,
7482
session_dir: str,
7583
minimal: bool,
@@ -83,6 +91,11 @@ def __init__(
8391
http_port_retries: The maximum retry to bind ports for the Http server.
8492
gcs_address: The GCS address in the {address}:{port} format.
8593
log_dir: The log directory. E.g., /tmp/session_latest/logs.
94+
logging_level: The logging level (e.g. logging.INFO, logging.DEBUG)
95+
logging_format: The format string for log messages
96+
logging_filename: The name of the log file
97+
logging_rotate_bytes: Max size in bytes before rotating log file
98+
logging_rotate_backup_count: Number of backup files to keep when rotating
8699
temp_dir: The temp directory. E.g., /tmp.
87100
session_dir: The session directory. E.g., tmp/session_latest.
88101
minimal: Whether or not it will load the minimal modules.
@@ -117,6 +130,11 @@ def __init__(
117130
self.gcs_address = gcs_address
118131
self.cluster_id_hex = cluster_id_hex
119132
self.log_dir = log_dir
133+
self.logging_level = logging_level
134+
self.logging_format = logging_format
135+
self.logging_filename = logging_filename
136+
self.logging_rotate_bytes = logging_rotate_bytes
137+
self.logging_rotate_backup_count = logging_rotate_backup_count
120138
self.temp_dir = temp_dir
121139
self.session_dir = session_dir
122140
self.session_name = Path(session_dir).name
@@ -137,7 +155,11 @@ def __init__(
137155
# be configured to expose APIs.
138156
self.http_server = None
139157

140-
async def _configure_http_server(self, modules):
158+
async def _configure_http_server(
159+
self,
160+
dashboard_head_modules: List[DashboardHeadModule],
161+
subprocess_module_handles: List["SubprocessModuleHandle"],
162+
):
141163
from ray.dashboard.http_server_head import HttpServerDashboardHead
142164

143165
self.http_server = HttpServerDashboardHead(
@@ -149,7 +171,7 @@ async def _configure_http_server(self, modules):
149171
self.session_name,
150172
self.metrics,
151173
)
152-
await self.http_server.run(modules)
174+
await self.http_server.run(dashboard_head_modules, subprocess_module_handles)
153175

154176
@property
155177
def http_session(self):
@@ -173,8 +195,41 @@ async def _gcs_check_alive(self):
173195
except Exception:
174196
logger.warning("Failed to check gcs aliveness, will retry", exc_info=True)
175197

176-
def _load_modules(self, modules_to_load: Optional[Set[str]] = None):
177-
"""Load dashboard head modules.
198+
def _load_modules(
199+
self, modules_to_load: Optional[Set[str]] = None
200+
) -> Tuple[List[DashboardHeadModule], List["SubprocessModuleHandle"]]:
201+
"""
202+
If minimal, only load DashboardHeadModule.
203+
If non-minimal, load both kinds of modules: DashboardHeadModule, SubprocessModule.
204+
205+
If modules_to_load is not None, only load the modules in the set.
206+
"""
207+
dashboard_head_modules = self._load_dashboard_head_modules(modules_to_load)
208+
subprocess_module_handles = self._load_subprocess_module_handles(
209+
modules_to_load
210+
)
211+
212+
all_names = {type(m).__name__ for m in dashboard_head_modules} | {
213+
h.module_cls.__name__ for h in subprocess_module_handles
214+
}
215+
assert len(all_names) == len(dashboard_head_modules) + len(
216+
subprocess_module_handles
217+
), "Duplicate module names. A module name can't be a DashboardHeadModule and a SubprocessModule at the same time."
218+
219+
# Verify modules are loaded as expected.
220+
if modules_to_load is not None and all_names != modules_to_load:
221+
assert False, (
222+
f"Actual loaded modules {all_names}, doesn't match the requested modules "
223+
f"to load, {modules_to_load}."
224+
)
225+
226+
self._modules_loaded = True
227+
return dashboard_head_modules, subprocess_module_handles
228+
229+
def _load_dashboard_head_modules(
230+
self, modules_to_load: Optional[Set[str]] = None
231+
) -> List[DashboardHeadModule]:
232+
"""Load `DashboardHeadModule`s.
178233
179234
Args:
180235
modules: A list of module names to load. By default (None),
@@ -198,27 +253,72 @@ def _load_modules(self, modules_to_load: Optional[Set[str]] = None):
198253
)
199254

200255
# Select modules to load.
201-
modules_to_load = modules_to_load or {m.__name__ for m in head_cls_list}
202-
logger.info("Modules to load: %s", modules_to_load)
256+
if modules_to_load is not None:
257+
head_cls_list = [
258+
cls for cls in head_cls_list if cls.__name__ in modules_to_load
259+
]
203260

204-
for cls in head_cls_list:
205-
logger.info("Loading %s: %s", DashboardHeadModule.__name__, cls)
206-
if cls.__name__ in modules_to_load:
207-
c = cls(config)
208-
modules.append(c)
261+
logger.info(f"DashboardHeadModules to load: {modules_to_load}.")
209262

210-
# Verify modules are loaded as expected.
211-
loaded_modules = {type(m).__name__ for m in modules}
212-
if loaded_modules != modules_to_load:
213-
assert False, (
214-
"Actual loaded modules, {}, doesn't match the requested modules "
215-
"to load, {}".format(loaded_modules, modules_to_load)
216-
)
263+
for cls in head_cls_list:
264+
logger.info(f"Loading {DashboardHeadModule.__name__}: {cls}.")
265+
c = cls(config)
266+
modules.append(c)
217267

218-
self._modules_loaded = True
219-
logger.info("Loaded %d modules. %s", len(modules), modules)
268+
logger.info(f"Loaded {len(modules)} dashboard head modules: {modules}.")
220269
return modules
221270

271+
def _load_subprocess_module_handles(
272+
self, modules_to_load: Optional[Set[str]] = None
273+
) -> List["SubprocessModuleHandle"]:
274+
"""
275+
If minimal, return an empty list.
276+
If non-minimal, load `SubprocessModule`s by creating Handles to them.
277+
278+
Args:
279+
modules: A list of module names to load. By default (None),
280+
it loads all modules.
281+
"""
282+
if self.minimal:
283+
logger.info("Subprocess modules not loaded in minimal mode.")
284+
return []
285+
286+
from ray.dashboard.subprocesses.module import (
287+
SubprocessModule,
288+
SubprocessModuleConfig,
289+
)
290+
from ray.dashboard.subprocesses.handle import SubprocessModuleHandle
291+
292+
handles = []
293+
subprocess_cls_list = dashboard_utils.get_all_modules(SubprocessModule)
294+
295+
loop = ray._common.utils.get_or_create_event_loop()
296+
config = SubprocessModuleConfig(
297+
cluster_id_hex=self.cluster_id_hex,
298+
gcs_address=self.gcs_address,
299+
logging_level=self.logging_level,
300+
logging_format=self.logging_format,
301+
log_dir=self.log_dir,
302+
logging_filename=self.logging_filename,
303+
logging_rotate_bytes=self.logging_rotate_bytes,
304+
logging_rotate_backup_count=self.logging_rotate_backup_count,
305+
socket_dir=str(Path(self.session_dir) / "sockets"),
306+
)
307+
308+
# Select modules to load.
309+
if modules_to_load is not None:
310+
subprocess_cls_list = [
311+
cls for cls in subprocess_cls_list if cls.__name__ in modules_to_load
312+
]
313+
314+
for cls in subprocess_cls_list:
315+
logger.info(f"Loading {SubprocessModule.__name__}: {cls}.")
316+
handle = SubprocessModuleHandle(loop, cls, config)
317+
handles.append(handle)
318+
319+
logger.info(f"Loaded {len(handles)} subprocess modules: {handles}.")
320+
return handles
321+
222322
async def _setup_metrics(self, gcs_aio_client):
223323
metrics = DashboardPrometheusMetrics()
224324

@@ -291,12 +391,22 @@ async def _async_notify():
291391
except Exception:
292392
logger.exception(f"Error notifying coroutine {co}")
293393

294-
modules = self._load_modules(self._modules_to_load)
394+
dashboard_head_modules, subprocess_module_handles = self._load_modules(
395+
self._modules_to_load
396+
)
397+
# Parallel start all subprocess modules.
398+
for handle in subprocess_module_handles:
399+
handle.start_module()
400+
# Wait for all subprocess modules to be ready.
401+
for handle in subprocess_module_handles:
402+
handle.wait_for_module_ready()
295403

296404
http_host, http_port = self.http_host, self.http_port
297405
if self.serve_frontend:
298406
logger.info("Initialize the http server.")
299-
await self._configure_http_server(modules)
407+
await self._configure_http_server(
408+
dashboard_head_modules, subprocess_module_handles
409+
)
300410
http_host, http_port = self.http_server.get_address()
301411
logger.info(f"http server initialized at {http_host}:{http_port}")
302412
else:
@@ -336,7 +446,7 @@ async def _async_notify():
336446
DataOrganizer.purge(),
337447
DataOrganizer.organize(self._executor),
338448
]
339-
for m in modules:
449+
for m in dashboard_head_modules:
340450
concurrent_tasks.append(m.run(self.server))
341451
if self.server:
342452
concurrent_tasks.append(self.server.wait_for_termination())

0 commit comments

Comments
 (0)