Skip to content

Commit 038bd85

Browse files
MengjinYandayshahedoakes
authored
[Core] Making Object Store Fallback Directory Configurable (#51189)
Recently we investigated issue where a job ran out of disk because the fallback directory is full. Currently, the fallback directory is set to /tmp folder which might have a size constraint and is not able to let the job to use the full disk when it is necessary. (Updated with [541b5fd](541b5fd)) This PR: 1. Makes the fallback path configurable by setting to the same as the object spilling directory if object spilling is set to file system. 2. Exposes the object spilling path through both the `ray start` CLI command option `--object-spilling-storage-path` and the `ray.init()` parameter `object_spilling_storage_path`. For the public API, the input path will by default be a file system path and for now, only one path can be specified. At the same time, the previous logic of setting the object spilling config will still work for advanced settings. But the path passed in through the public API will take precedence. The corresponding documentation about spilling object is updated acoordingly. In addition to the test added to the PR, the cli config is also manually tested: When running `ray start` with the following: ``` ray --logging-level=DEBUG start --head --object-spilling-storage-path="/tmp/spill/" ``` The object spill path and the fallback directory will both be configured to "/tmp/spill": ``` 2025-03-19 22:49:49,337 DEBUG node.py:242 -- Starting node with object spilling config: {"type": "filesystem", "params": {"directory_path": "/tmp/spill/"}} 2025-03-19 22:49:50,014 DEBUG services.py:2172 -- Determine to start the Plasma object store with 2.15 GB memory using /tmp and fallback to /tmp/spill ``` First version: > This PR make the fallback directory configurable so that it can be configured to a larger disk space when it is needed. The configuration is done by setting the `--fallback-directory` option in the `ray start` command. > > The PR is manually tested: > 1. When running `ray start` without the config: > ``` > ray start --head > ``` > The fallback directory is set to `/tmp` in `raylet.out`: > ``` > [2025-03-08 19:50:31,664 I 31368 35096955] (raylet) dlmalloc.cc:321: Setting dlmalloc config: plasma_directory=/tmp, fallback_directory=/tmp, hugepage_enabled=0, fallback_enabled=1 > ``` > 2. When running `ray start` with the config: > ``` > ray start --head --fallback-directory="/tmp/fallback" > ``` > The fallback directory is set to `/tmp/fallback` in `raylet.out`: > ``` > [2025-03-08 20:23:25,279 I 61733 35267407] (raylet) dlmalloc.cc:321: Setting dlmalloc config: plasma_directory=/tmp, fallback_directory=/tmp/fallback, hugepage_enabled=0, fallback_enabled=1 > ``` --------- Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com> Co-authored-by: Dhyey Shah <dhyey2019@gmail.com> Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>
1 parent d95a57a commit 038bd85

File tree

14 files changed

+399
-239
lines changed

14 files changed

+399
-239
lines changed

doc/source/ray-core/objects/object-spilling.rst

Lines changed: 16 additions & 185 deletions
Original file line numberDiff line numberDiff line change
@@ -2,205 +2,36 @@ Object Spilling
22
===============
33
.. _object-spilling:
44

5-
Ray 1.3+ spills objects to external storage once the object store is full. By default, objects are spilled to Ray's temporary directory in the local filesystem.
5+
Ray spills objects to a directory in the local filesystem once the object store is full. By default, Ray
6+
spills objects to the temporary directory (for example, ``/tmp/ray/session_2025-03-28_00-05-20_204810_2814690``).
67

7-
Single node
8-
-----------
8+
Spilling to a custom directory
9+
-------------------------------
910

10-
Ray uses object spilling by default. Without any setting, objects are spilled to `[temp_folder]/spill`. On Linux and MacOS, the `temp_folder` is `/tmp` by default.
11+
You can specify a custom directory for spilling objects by setting the
12+
``object_spilling_directory`` parameter in the ``ray.init`` function or the
13+
``--object-spilling-directory`` command line option in the ``ray start`` command.
1114

12-
To configure the directory where objects are spilled to, use:
15+
.. tab-set::
1316

14-
.. testcode::
15-
:hide:
17+
.. tab-item:: Python
1618

17-
import ray
18-
ray.shutdown()
19+
.. doctest::
1920

20-
.. testcode::
21+
ray.init(object_spilling_directory="/path/to/spill/dir")
2122

22-
import json
23-
import ray
24-
25-
ray.init(
26-
_system_config={
27-
"object_spilling_config": json.dumps(
28-
{"type": "filesystem", "params": {"directory_path": "/tmp/spill"}},
29-
)
30-
},
31-
)
23+
.. tab-item:: CLI
3224

33-
You can also specify multiple directories for spilling to spread the IO load and disk space
34-
usage across multiple physical devices if needed (e.g., SSD devices):
25+
.. doctest::
3526

36-
.. testcode::
37-
:hide:
27+
ray start --object-spilling-directory=/path/to/spill/dir
3828

39-
ray.shutdown()
40-
41-
.. testcode::
42-
43-
import json
44-
import ray
45-
46-
ray.init(
47-
_system_config={
48-
"max_io_workers": 4, # More IO workers for parallelism.
49-
"object_spilling_config": json.dumps(
50-
{
51-
"type": "filesystem",
52-
"params": {
53-
# Multiple directories can be specified to distribute
54-
# IO across multiple mounted physical devices.
55-
"directory_path": [
56-
"/tmp/spill",
57-
"/tmp/spill_1",
58-
"/tmp/spill_2",
59-
]
60-
},
61-
}
62-
)
63-
},
64-
)
65-
66-
67-
.. note::
68-
69-
To optimize the performance, it is recommended to use an SSD instead of an HDD when using object spilling for memory-intensive workloads.
70-
71-
If you are using an HDD, it is recommended that you specify a large buffer size (> 1MB) to reduce IO requests during spilling.
72-
73-
.. testcode::
74-
:hide:
75-
76-
ray.shutdown()
77-
78-
.. testcode::
79-
80-
import json
81-
import ray
82-
83-
ray.init(
84-
_system_config={
85-
"object_spilling_config": json.dumps(
86-
{
87-
"type": "filesystem",
88-
"params": {
89-
"directory_path": "/tmp/spill",
90-
"buffer_size": 1_000_000,
91-
}
92-
},
93-
)
94-
},
95-
)
96-
97-
To prevent running out of disk space, local object spilling will throw ``OutOfDiskError`` if the disk utilization exceeds the predefined threshold.
98-
If multiple physical devices are used, any physical device's over-usage will trigger the ``OutOfDiskError``.
99-
The default threshold is 0.95 (95%). You can adjust the threshold by setting ``local_fs_capacity_threshold``, or set it to 1 to disable the protection.
100-
101-
.. testcode::
102-
:hide:
103-
104-
ray.shutdown()
105-
106-
.. testcode::
107-
108-
import json
109-
import ray
110-
111-
ray.init(
112-
_system_config={
113-
# Allow spilling until the local disk is 99% utilized.
114-
# This only affects spilling to the local file system.
115-
"local_fs_capacity_threshold": 0.99,
116-
"object_spilling_config": json.dumps(
117-
{
118-
"type": "filesystem",
119-
"params": {
120-
"directory_path": "/tmp/spill",
121-
}
122-
},
123-
)
124-
},
125-
)
126-
127-
128-
To enable object spilling to remote storage (any URI supported by `smart_open <https://pypi.org/project/smart-open/>`__):
129-
130-
.. testcode::
131-
:hide:
132-
133-
ray.shutdown()
134-
135-
.. testcode::
136-
:skipif: True
137-
138-
import json
139-
import ray
140-
141-
ray.init(
142-
_system_config={
143-
"max_io_workers": 4, # More IO workers for remote storage.
144-
"min_spilling_size": 100 * 1024 * 1024, # Spill at least 100MB at a time.
145-
"object_spilling_config": json.dumps(
146-
{
147-
"type": "smart_open",
148-
"params": {
149-
"uri": "s3://bucket/path"
150-
},
151-
"buffer_size": 100 * 1024 * 1024, # Use a 100MB buffer for writes
152-
},
153-
)
154-
},
155-
)
156-
157-
It is recommended that you specify a large buffer size (> 1MB) to reduce IO requests during spilling.
158-
159-
Spilling to multiple remote storages is also supported.
160-
161-
.. testcode::
162-
:hide:
163-
164-
ray.shutdown()
165-
166-
.. testcode::
167-
:skipif: True
168-
169-
import json
170-
import ray
171-
172-
ray.init(
173-
_system_config={
174-
"max_io_workers": 4, # More IO workers for remote storage.
175-
"min_spilling_size": 100 * 1024 * 1024, # Spill at least 100MB at a time.
176-
"object_spilling_config": json.dumps(
177-
{
178-
"type": "smart_open",
179-
"params": {
180-
"uri": ["s3://bucket/path1", "s3://bucket/path2", "s3://bucket/path3"],
181-
},
182-
"buffer_size": 100 * 1024 * 1024, # Use a 100MB buffer for writes
183-
},
184-
)
185-
},
186-
)
187-
188-
Remote storage support is still experimental.
189-
190-
Cluster mode
191-
------------
192-
To enable object spilling in multi node clusters:
193-
194-
.. code-block:: bash
195-
196-
# Note that `object_spilling_config`'s value should be json format.
197-
# You only need to specify the config when starting the head node, all the worker nodes will get the same config from the head node.
198-
ray start --head --system-config='{"object_spilling_config":"{\"type\":\"filesystem\",\"params\":{\"directory_path\":\"/tmp/spill\"}}"}'
29+
For advanced usage and customizations, reach out to the `Ray team <https://www.ray.io/community>`_.
19930

20031
Stats
20132
-----
20233

203-
When spilling is happening, the following INFO level messages will be printed to the raylet logs (e.g., ``/tmp/ray/session_latest/logs/raylet.out``)::
34+
When spilling is happening, the following INFO level messages are printed to the Raylet logs-for example, ``/tmp/ray/session_latest/logs/raylet.out``::
20435

20536
local_object_manager.cc:166: Spilled 50 MiB, 1 objects, write throughput 230 MiB/s
20637
local_object_manager.cc:334: Restored 50 MiB, 1 objects, read throughput 505 MiB/s

python/ray/_private/arrow_serialization.py

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import os
88
import sys
99
from typing import List, Tuple, Optional, TYPE_CHECKING
10+
from ray._private.utils import is_in_test
1011

1112
if TYPE_CHECKING:
1213
import pyarrow
@@ -24,22 +25,6 @@
2425
# Whether we have already warned the user about bloated fallback serialization.
2526
_serialization_fallback_set = set()
2627

27-
# Whether we're currently running in a test, either local or CI.
28-
_in_test = None
29-
30-
31-
def _is_in_test():
32-
global _in_test
33-
34-
if _in_test is None:
35-
_in_test = any(
36-
env_var in os.environ
37-
# These environment variables are always set by pytest and Buildkite,
38-
# respectively.
39-
for env_var in ("PYTEST_CURRENT_TEST", "BUILDKITE")
40-
)
41-
return _in_test
42-
4328

4429
def _register_custom_datasets_serializers(serialization_context):
4530
try:
@@ -148,7 +133,7 @@ def _arrow_table_reduce(t: "pyarrow.Table"):
148133
# Delegate to ChunkedArray reducer.
149134
reduced_column = _arrow_chunked_array_reduce(column)
150135
except Exception as e:
151-
if not _is_dense_union(column.type) and _is_in_test():
136+
if not _is_dense_union(column.type) and is_in_test():
152137
# If running in a test and the column is not a dense union array
153138
# (which we expect to need a fallback), we want to raise the error,
154139
# not fall back.

0 commit comments

Comments
 (0)