Skip to content

Commit 09f5ece

Browse files
parasebarabernat
andauthored
New knob for stream stall detection on S3 storage (#1170)
* New knob for stream stall detection on S3 storage Affects: `s3_storage`, `s3_store`, `tigris_storage`, `r2_storage` They all now support a new `network_stream_timeout_seconds` argument with default 20. If set to 0 stream stall detection is deactivated. * Change default to 60 * Fix backwards compatibility * Fix S3Options constructor in pyi * added description of new config settings * Fix documentation link and typo * Fix link --------- Co-authored-by: Ryan Abernathey <ryan.abernathey@gmail.com>
1 parent a8a04b7 commit 09f5ece

File tree

14 files changed

+231
-23
lines changed

14 files changed

+231
-23
lines changed

docs/docs/configuration.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ The threshold for when to inline a chunk into a manifest instead of storing it a
2828

2929
The number of concurrent requests to make when getting partial values from storage.
3030

31+
### [`max_concurrent_requests`](./reference.md#icechunk.RepositoryConfig.max_concurrent_requests)
32+
33+
The maximum total number of concurrent requests this repo will allow.
34+
See [Performance | Concurrency](../performance#Concurrency) for details.
35+
3136
### [`compression`](./reference.md#icechunk.RepositoryConfig.compression)
3237

3338
Icechunk uses Zstd compression to compress its metadata files. [`CompressionConfig`](./reference.md#icechunk.CompressionConfig) allows you to configure the [compression level](./reference.md#icechunk.CompressionConfig.level) and [algorithm](./reference.md#icechunk.CompressionConfig.algorithm). Currently, the only algorithm available is [`Zstd`](https://facebook.github.io/zstd/).

docs/docs/performance.md

Lines changed: 113 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,92 @@
11
# Performance
22

3-
!!! info
3+
## Concurrency
44

5-
This is advanced material, and you will need it only if you have arrays with more than a million chunks.
6-
Icechunk aims to provide an excellent experience out of the box.
5+
Optimal I/O throughput with object storage is achieved when there are many HTTP requests running concurrently.
6+
Icechunk supports this by using Rust's high-performance Tokio asynchronous runtime to issue these requests.
7+
8+
### Setting Zarr's Async Concurrency Configuration
9+
10+
Icechunk is used in conjunction with Zarr Python.
11+
The level of concurrency used in each request is controlled by the zarr `async.concurrency` config parameter.
12+
13+
```python
14+
import zarr
15+
print(zarr.config.get("async.concurrency"))
16+
# -> 10 (default)
17+
```
18+
19+
Large machines in close proximity to object storage can benefit from much more concurrency. For high-performance configuration, we recommend much higher values, e.g.
20+
21+
```python
22+
zarr.config.get({"async.concurrency": 128})
23+
```
24+
25+
Note that this concurrency limit is _per individual Zarr Array read/write operation_
26+
27+
```
28+
# chunks fetched concurrently up to async.concurrency limit
29+
data = array[:]
30+
# chunks written concurrently up to async.concurrency limit
31+
array[:] = data
32+
```
33+
34+
### Dask and Multi-Tiered Concurrency
35+
36+
Using Dask with Zarr introduces _another_ layer of concurrency: the number of Dask threads or workers.
37+
If each Dask task addresses multiple Zarr chunks, the amount of concurrency multiplies.
38+
In these circumstances, it is possible to generate _too much concurrency_.
39+
If there are **thousands** of concurrent HTTP requests in flight, they may start to stall or time out.
40+
To prevent this, Icechunk introduces a global concurrency limit.
41+
42+
### Icechunk Global Concurrency Limit
43+
44+
Each Icechunk repo has a cap on the maximum amount of concurrent requests that will be made.
45+
The default concurrency limit is 256.
46+
47+
For example, the following code sets this limit to 10
48+
49+
```python
50+
config = icechunk.RepositoryConfig(max_concurrent_requests=10)
51+
repo = icechunk.Repository.open(
52+
storage=storage,
53+
config=config,
54+
)
55+
```
56+
57+
In this configuration, even if the upper layers of the stack (Dask and Zarr) issue many more concurrent requests, Icechunk will only open 10 HTTP connections to the object store at once.
58+
59+
### Stalled Network Streams
60+
61+
A stalled network stream is an HTTP connection which does not transfer any data over a certain period.
62+
Stalled connections may occur in the following situations:
63+
64+
- When the client is connecting to a remote object store behind a slow network connection.
65+
- When the client is behind a VPN or proxy server which is limiting the number or throughput of connections between the client and the remote object store.
66+
- When the client tries to issue a high volume of concurrent requests. (Note that the global concurrency limit described above should help avoid this, but the precise limit is hardware- and network-dependent. )
67+
68+
By default, Icechunk detects stalled HTTP connections and raises an error when it sees one.
69+
These errors typically contain lines like
70+
71+
```
72+
|-> I/O error
73+
|-> streaming error
74+
`-> minimum throughput was specified at 1 B/s, but throughput of 0 B/s was observed
75+
```
76+
77+
This behavior is configurable when creating a new `Storage` option, via the `network_stream_timeout_seconds` parameter.
78+
The default is 60 seconds.
79+
To set a different value, you may specify as follows
80+
81+
```python
82+
storage= icechunk.s3_storage(
83+
**other_storage_kwargs,
84+
network_stream_timeout_seconds=50,
85+
)
86+
repo = icechunk.Repository.open(storage=storage)
87+
```
88+
89+
Specifying a value of 0 disables this check entirely.
790

891
## Scalability
992

@@ -41,6 +124,11 @@ on Icechunk scalability.
41124

42125
## Splitting manifests
43126

127+
!!! info
128+
129+
This is advanced material, and you will need it only if you have arrays with more than a million chunks.
130+
Icechunk aims to provide an excellent experience out of the box.
131+
44132
Icechunk stores chunk references in a chunk manifest file stored in `manifests/`.
45133
By default, Icechunk stores all chunk references in a single manifest file per array.
46134
For very large arrays (millions of chunks), these files can get quite large.
@@ -53,10 +141,10 @@ downloading and rewriting the entire manifest.
53141

54142
Note that the chunk sizes in the following examples are tiny for demonstration purposes.
55143

56-
57144
### Configuring splitting
58145

59-
To solve this issue, Icechunk lets you __split__ the manifest files by specifying a ``ManifestSplittingConfig``.
146+
To solve this issue, Icechunk lets you **split** the manifest files by specifying a ``ManifestSplittingConfig``.
147+
60148
```python exec="on" session="perf" source="material-block"
61149
import icechunk as ic
62150
from icechunk import ManifestSplitCondition, ManifestSplittingConfig, ManifestSplitDimCondition
@@ -74,6 +162,7 @@ repo_config = ic.RepositoryConfig(
74162
```
75163

76164
Then pass the `config` to `Repository.open` or `Repository.create`
165+
77166
```python
78167
repo = ic.Repository.open(..., config=repo_config)
79168
```
@@ -137,12 +226,12 @@ will result in splitting manifests so that each manifest contains (3 longitude c
137226

138227
Python dictionaries preserve insertion order, so the first condition encountered takes priority.
139228

140-
141229
### Splitting behaviour
142230

143231
By default, Icechunk minimizes the number of chunk refs that are written in a single commit.
144232

145233
Consider this simple example: a 1D array with split size 1 along axis 0.
234+
146235
```python exec="on" session="perf" source="material-block"
147236
import random
148237

@@ -166,6 +255,7 @@ repo = ic.Repository.create(storage, config=repo_config)
166255
```
167256

168257
Create an array
258+
169259
```python exec="on" session="perf" source="material-block"
170260
import zarr
171261

@@ -176,6 +266,7 @@ array = root.create_array(name=name, shape=(10,), dtype=int, chunks=(1,))
176266
```
177267

178268
Now lets write 5 chunk references
269+
179270
```python exec="on" session="perf" source="material-block"
180271
import numpy as np
181272

@@ -184,16 +275,19 @@ print(session.status())
184275
```
185276

186277
And commit
278+
187279
```python exec="on" session="perf" source="material-block"
188280
snap = session.commit("Add 5 chunks")
189281
```
190282

191283
Use [`repo.lookup_snapshot`](./reference.md#icechunk.Repository.lookup_snapshot) to examine the manifests associated with a Snapshot
284+
192285
```python exec="on" session="perf" source="material-block"
193286
print(repo.lookup_snapshot(snap).manifests)
194287
```
195288

196289
Let's open the Repository again with a different splitting config --- where 5 chunk references are in a single manifest.
290+
197291
```python exec="on" session="perf" source="material-block"
198292
split_config = ManifestSplittingConfig.from_dict(
199293
{ManifestSplitCondition.AnyArray(): {ManifestSplitDimCondition.Any(): 5}}
@@ -204,6 +298,7 @@ print(new_repo.config.manifest)
204298
```
205299

206300
Now let's append data.
301+
207302
```python exec="on" session="perf" source="material-block"
208303
session = new_repo.writable_session("main")
209304
array = zarr.open_array(session.store, path=name, mode="a")
@@ -235,6 +330,7 @@ print(session.status())
235330
snap3 = session.commit("rewrite [3,7)")
236331
print(repo.lookup_snapshot(snap3).manifests)
237332
```
333+
238334
This ends up rewriting all refs to two new manifests.
239335

240336
### Rewriting manifests
@@ -247,6 +343,7 @@ At that point, you will want to experiment with different manifest split configu
247343
To force Icechunk to rewrite all chunk refs to the current splitting configuration use [`rewrite_manifests`](./reference.md#icechunk.Repository.rewrite_manifests)
248344

249345
To illustrate, we will use a split size of 3 --- for the current example this will consolidate to two manifests.
346+
250347
```python exec="on" session="perf" source="material-block"
251348
split_config = ManifestSplittingConfig.from_dict(
252349
{ManifestSplitCondition.AnyArray(): {ManifestSplitDimCondition.Any(): 3}}
@@ -262,11 +359,13 @@ snap4 = new_repo.rewrite_manifests(
262359
```
263360

264361
`rewrite_snapshots` will create a new commit on `branch` with the provided `message`.
362+
265363
```python exec="on" session="perf" source="material-block"
266364
print(repo.lookup_snapshot(snap4).manifests)
267365
```
268366

269367
The splitting configuration is saved in the snapshot metadata.
368+
270369
```python exec="on" session="perf" source="material-block"
271370
print(repo.lookup_snapshot(snap4).metadata)
272371
```
@@ -275,7 +374,6 @@ print(repo.lookup_snapshot(snap4).metadata)
275374

276375
Once you find a splitting configuration you like, remember to persist it on-disk using `repo.save_config`.
277376

278-
279377
### Example workflow
280378

281379
Here is an example workflow for experimenting with splitting
@@ -293,31 +391,38 @@ repo = ic.Repository.open(storage, config=repo_config)
293391
```
294392

295393
We will rewrite the manifests on a different branch
394+
296395
```python exec="on" session="perf" source="material-block"
297396
repo.create_branch("split-experiment-1", repo.lookup_branch("main"))
298397
snap = repo.rewrite_manifests(
299398
f"rewrite_manifests with new config", branch="split-experiment-1"
300399
)
301400
print(repo.lookup_snapshot(snap).manifests)
302401
```
402+
303403
Now benchmark reads on `main` vs `split-experiment-1`
404+
304405
```python exec="on" session="perf" source="material-block"
305406
store = repo.readonly_session("main").store
306407
store_split = repo.readonly_session("split-experiment-1").store
307408
# ...
308409
```
410+
309411
Assume we decided the configuration on `split-experiment-1` was good.
310412
First we persist that configuration to disk
413+
311414
```python exec="on" session="perf" source="material-block"
312415
repo.save_config()
313416
```
314417

315418
Now point the `main` branch to the commit with rewritten manifests
419+
316420
```python exec="on" session="perf" source="material-block"
317421
repo.reset_branch("main", repo.lookup_branch("split-experiment-1"))
318422
```
319423

320424
Notice that the persisted config is restored when opening a Repository
425+
321426
```python exec="on" session="perf" source="material-block"
322427
print(ic.Repository.open(storage).config.manifest)
323428
```
@@ -351,6 +456,7 @@ repo = ic.Repository.open(..., config=repo_config)
351456
```
352457

353458
This example will preload all manifests that match the regex "x" when opening a Session. While this is a simple example, you can use the `ManifestPreloadCondition` class to create more complex preload conditions using the following options:
459+
354460
- `ManifestPreloadCondition.name_matches` takes a regular expression used to match an array's name;
355461
- `ManifestPreloadCondition.path_matches` takes a regular expression used to match an array's path;
356462
- `ManifestPreloadCondition.and_conditions` to combine (1), (2), and (4) together; and
@@ -414,7 +520,6 @@ This will preload all manifests that match the array name "x" while the number o
414520

415521
Once you find a preload configuration you like, remember to persist it on-disk using `repo.save_config`. The saved config can be overridden at runtime for different applications.
416522

417-
418523
#### Default preload configuration
419524

420525
Icechunk has a default `preload_if` configuration that will preload all manifests that match [cf-xarrays coordinate axis regex](https://github.com/xarray-contrib/cf-xarray/blob/1591ff5ea7664a6bdef24055ef75e242cd5bfc8b/cf_xarray/criteria.py#L149-L160).

icechunk-python/python/icechunk/_icechunk_python.pyi

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ class S3Options:
1313
allow_http: bool = False,
1414
anonymous: bool = False,
1515
force_path_style: bool = False,
16+
network_stream_timeout_seconds: int | None = None,
1617
) -> None:
1718
"""
1819
Create a new `S3Options` object
@@ -29,6 +30,9 @@ class S3Options:
2930
Whether to use anonymous credentials to the storage backend. When `True`, the s3 requests will not be signed.
3031
force_path_style: bool
3132
Whether to force use of path-style addressing for buckets.
33+
network_stream_timeout_seconds: int | None
34+
Timeout requests if no bytes can be transmitted during this period of time.
35+
If set to 0, timeout is disabled. Default is 60 seconds.
3236
"""
3337

3438
class ObjectStoreConfig:

0 commit comments

Comments
 (0)