Skip to content

Commit 09329fb

Browse files
authored
Speedup list_dir and list_prefix (#1257)
* Speedup list_dir and list_prefix This avoids going through every node each time `list_dir` is called. This is important because Zarr calls `list_dir` recursively to build `tree`. In flat repositories with ~ 60k nodes, we verified ~15x performance improvement for `Group.tree`. * Trace log level for xarray backend tests * Don't use in memory store for xarray backend tests
1 parent 3a90068 commit 09329fb

File tree

11 files changed

+160
-48
lines changed

11 files changed

+160
-48
lines changed

.github/workflows/python-check.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ jobs:
178178
working-directory: icechunk/icechunk-python
179179
env:
180180
ICECHUNK_XARRAY_BACKENDS_TESTS: 1
181+
ICECHUNK_LOG: trace
181182
run: |
182183
set -e
183184
# pass xarray's pyproject.toml so that pytest can find the `flaky` fixture

icechunk-python/tests/run_xarray_backends_tests.py

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
from icechunk import (
1717
IcechunkStore,
1818
Repository,
19-
in_memory_storage,
2019
local_filesystem_storage,
2120
s3_storage,
2221
)
@@ -105,7 +104,8 @@ def create_repo(self) -> Generator[Repository]:
105104
class TestIcechunkStoreMemory(IcechunkStoreBase):
106105
@contextlib.contextmanager
107106
def create_repo(self) -> Generator[Repository]:
108-
yield Repository.create(in_memory_storage())
107+
with tempfile.TemporaryDirectory() as tmpdir:
108+
yield Repository.create(local_filesystem_storage(tmpdir))
109109

110110
def test_pickle(self) -> None:
111111
pytest.skip("pickling memory store is not supported")
@@ -137,9 +137,11 @@ class TestIcechunkRegionAuto(ZarrRegionAutoTests):
137137
def create_zarr_target(self) -> Generator[IcechunkStore]:
138138
if zarr.config.config["default_zarr_format"] == 2:
139139
pytest.skip("v2 not supported")
140-
repo = Repository.create(in_memory_storage())
141-
session = repo.writable_session("main")
142-
yield session.store
140+
141+
with tempfile.TemporaryDirectory() as tmpdir:
142+
repo = Repository.create(local_filesystem_storage(tmpdir))
143+
session = repo.writable_session("main")
144+
yield session.store
143145

144146
@contextlib.contextmanager
145147
def create(self):
@@ -149,11 +151,12 @@ def create(self):
149151
ds = xr.Dataset(
150152
{"test": xr.DataArray(data, dims=("x", "y"), coords={"x": x, "y": y})}
151153
)
152-
repo = Repository.create(in_memory_storage())
153-
session = repo.writable_session("main")
154-
self.save(session.store, ds)
155-
session.commit("initial commit")
156-
yield repo.writable_session("main").store, ds
154+
with tempfile.TemporaryDirectory() as tmpdir:
155+
repo = Repository.create(local_filesystem_storage(tmpdir))
156+
session = repo.writable_session("main")
157+
self.save(session.store, ds)
158+
session.commit("initial commit")
159+
yield repo.writable_session("main").store, ds
157160

158161
def save(self, target, ds, **kwargs):
159162
# not really important here

icechunk/examples/low_level_dataset.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ let ds = Repository::update(Arc::clone(&storage), ObjectId::from("{v2_id:?}"));
237237
async fn print_nodes(ds: &Session) -> Result<(), SessionError> {
238238
println!("### List of nodes");
239239
let rows = ds
240-
.list_nodes()
240+
.list_nodes(&Path::root())
241241
.await?
242242
.map(|n| n.unwrap())
243243
.sorted_by_key(|n| n.path.clone())

icechunk/src/conflicts/detector.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ impl ConflictSolver for ConflictDetector {
6363
Ok(None)
6464
});
6565

66-
let path_finder = PathFinder::new(current_repo.list_nodes().await?);
66+
let path_finder = PathFinder::new(current_repo.list_nodes(&Path::root()).await?);
6767

6868
let updated_arrays_already_updated = current_changes
6969
.updated_arrays()

icechunk/src/format/mod.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use core::fmt;
22
use std::{
3+
cmp::Ordering,
34
convert::Infallible,
45
fmt::{Debug, Display},
56
hash::Hash,
@@ -462,6 +463,37 @@ impl TryFrom<String> for Path {
462463
}
463464
}
464465

466+
#[inline(always)]
467+
pub fn lookup_index_by_key<'a, T: ::flatbuffers::Follow<'a> + 'a, K: Ord>(
468+
v: ::flatbuffers::Vector<'a, T>,
469+
key: K,
470+
f: fn(&<T as ::flatbuffers::Follow<'a>>::Inner, &K) -> Ordering,
471+
) -> Option<usize> {
472+
if v.is_empty() {
473+
return None;
474+
}
475+
476+
let mut left: usize = 0;
477+
let mut right = v.len() - 1;
478+
479+
while left <= right {
480+
let mid = (left + right) / 2;
481+
let value = v.get(mid);
482+
match f(&value, &key) {
483+
Ordering::Equal => return Some(mid),
484+
Ordering::Less => left = mid + 1,
485+
Ordering::Greater => {
486+
if mid == 0 {
487+
return None;
488+
}
489+
right = mid - 1;
490+
}
491+
}
492+
}
493+
494+
None
495+
}
496+
465497
#[cfg(test)]
466498
#[allow(clippy::panic, clippy::unwrap_used, clippy::expect_used)]
467499
mod tests {

icechunk/src/format/snapshot.rs

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ use itertools::Itertools as _;
88
use serde::{Deserialize, Serialize};
99
use serde_json::Value;
1010

11+
use crate::format::lookup_index_by_key;
12+
1113
use super::{
1214
AttributesId, ChunkIndices, IcechunkFormatError, IcechunkFormatErrorKind,
1315
IcechunkResult, ManifestId, NodeId, Path, SnapshotId,
@@ -524,14 +526,27 @@ impl Snapshot {
524526
res.try_into()
525527
}
526528

529+
pub fn get_node_index(&self, path: &Path) -> IcechunkResult<usize> {
530+
let path_str = path.to_string();
531+
let res =
532+
lookup_index_by_key(self.root().nodes(), path_str.as_str(), |node, path| {
533+
node.path().cmp(path)
534+
})
535+
.ok_or(IcechunkFormatError::from(
536+
IcechunkFormatErrorKind::NodeNotFound { path: path.clone() },
537+
))?;
538+
Ok(res)
539+
}
540+
527541
pub fn iter(&self) -> impl Iterator<Item = IcechunkResult<NodeSnapshot>> + '_ {
528542
self.root().nodes().iter().map(|node| node.try_into().err_into())
529543
}
530544

531545
pub fn iter_arc(
532546
self: Arc<Self>,
533-
) -> impl Iterator<Item = IcechunkResult<NodeSnapshot>> {
534-
NodeIterator { snapshot: self, last_index: 0 }
547+
parent_group: &Path,
548+
) -> impl Iterator<Item = IcechunkResult<NodeSnapshot>> + use<> {
549+
NodeIterator::new(self, parent_group)
535550
}
536551

537552
pub fn len(&self) -> usize {
@@ -554,18 +569,42 @@ impl Snapshot {
554569

555570
struct NodeIterator {
556571
snapshot: Arc<Snapshot>,
557-
last_index: usize,
572+
next_index: usize,
573+
prefix: String,
574+
}
575+
576+
impl NodeIterator {
577+
fn new(snapshot: Arc<Snapshot>, parent_group: &Path) -> Self {
578+
let next_index = snapshot.get_node_index(parent_group).unwrap_or_default();
579+
let prefix = parent_group.to_string();
580+
let prefix = if prefix == "/" { String::new() } else { prefix };
581+
NodeIterator { snapshot, next_index, prefix }
582+
}
558583
}
559584

560585
impl Iterator for NodeIterator {
561586
type Item = IcechunkResult<NodeSnapshot>;
562587

563588
fn next(&mut self) -> Option<Self::Item> {
564589
let nodes = self.snapshot.root().nodes();
565-
if self.last_index < nodes.len() {
566-
let res = Some(nodes.get(self.last_index).try_into().err_into());
567-
self.last_index += 1;
568-
res
590+
if self.next_index < nodes.len() {
591+
let node: IcechunkResult<NodeSnapshot> =
592+
nodes.get(self.next_index).try_into();
593+
594+
match node {
595+
Ok(res) => {
596+
if let Some(after_prefix) =
597+
res.path.to_string().strip_prefix(self.prefix.as_str())
598+
&& (after_prefix.is_empty() || after_prefix.starts_with('/'))
599+
{
600+
self.next_index += 1;
601+
Some(Ok(res))
602+
} else {
603+
None
604+
}
605+
}
606+
Err(err) => Some(Err(err)),
607+
}
569608
} else {
570609
None
571610
}

icechunk/src/format/transaction_log.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -260,9 +260,9 @@ impl DiffBuilder {
260260

261261
pub async fn to_diff(self, from: &Session, to: &Session) -> SessionResult<Diff> {
262262
let nodes: HashMap<NodeId, Path> = from
263-
.list_nodes()
263+
.list_nodes(&Path::root())
264264
.await?
265-
.chain(to.list_nodes().await?)
265+
.chain(to.list_nodes(&Path::root()).await?)
266266
.map_ok(|n| (n.id, n.path))
267267
.try_collect()?;
268268
Ok(Diff::from_diff_builder(self, nodes))

icechunk/src/repository.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -860,7 +860,7 @@ impl Repository {
860860
if let Ok(snap) = asset_manager.fetch_snapshot(&snapshot_id).await {
861861
let snap_c = Arc::clone(&snap);
862862
for node in snap
863-
.iter_arc()
863+
.iter_arc(&Path::root())
864864
.filter_ok(|node| node.node_type() == NodeType::Array)
865865
// TODO: make configurable
866866
.take(50)

icechunk/src/session.rs

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ impl Session {
372372
match self.get_group(&path).await {
373373
Ok(parent) => {
374374
let nodes_iter: Vec<NodeSnapshot> = self
375-
.list_nodes()
375+
.list_nodes(&path)
376376
.await?
377377
.filter_ok(|node| node.path.starts_with(&parent.path))
378378
.try_collect()?;
@@ -796,7 +796,7 @@ impl Session {
796796
pub async fn clear(&mut self) -> SessionResult<()> {
797797
// TODO: can this be a delete_group("/") instead?
798798
let to_delete: Vec<(NodeType, Path)> = self
799-
.list_nodes()
799+
.list_nodes(&Path::root())
800800
.await?
801801
.map_ok(|node| (node.node_type(), node.path))
802802
.try_collect()?;
@@ -850,10 +850,17 @@ impl Session {
850850
}
851851

852852
#[instrument(skip(self))]
853-
pub async fn list_nodes(
854-
&self,
855-
) -> SessionResult<impl Iterator<Item = SessionResult<NodeSnapshot>> + '_> {
856-
updated_nodes(&self.asset_manager, &self.change_set, &self.snapshot_id).await
853+
pub async fn list_nodes<'a>(
854+
&'a self,
855+
parent_group: &Path,
856+
) -> SessionResult<impl Iterator<Item = SessionResult<NodeSnapshot>> + use<'a>> {
857+
updated_nodes(
858+
parent_group,
859+
&self.asset_manager,
860+
&self.change_set,
861+
&self.snapshot_id,
862+
)
863+
.await
857864
}
858865

859866
#[instrument(skip(self))]
@@ -975,7 +982,7 @@ impl Session {
975982
message: &str,
976983
properties: Option<SnapshotProperties>,
977984
) -> SessionResult<SnapshotId> {
978-
let nodes = self.list_nodes().await?.collect::<Vec<_>>();
985+
let nodes = self.list_nodes(&Path::root()).await?.collect::<Vec<_>>();
979986
// We need to populate the `splits` before calling `commit`.
980987
// In the normal chunk setting workflow, that would've been done by `set_chunk_ref`
981988
for node in nodes.into_iter().flatten() {
@@ -1259,12 +1266,13 @@ impl Session {
12591266

12601267
/// Warning: The presence of a single error may mean multiple missing items
12611268
async fn updated_chunk_iterator<'a>(
1269+
parent_group: &Path,
12621270
asset_manager: &'a AssetManager,
12631271
change_set: &'a ChangeSet,
12641272
snapshot_id: &'a SnapshotId,
1265-
) -> SessionResult<impl Stream<Item = SessionResult<(Path, ChunkInfo)>> + 'a> {
1273+
) -> SessionResult<impl Stream<Item = SessionResult<(Path, ChunkInfo)>> + use<'a>> {
12661274
let snapshot = asset_manager.fetch_snapshot(snapshot_id).await?;
1267-
let nodes = futures::stream::iter(snapshot.iter_arc());
1275+
let nodes = futures::stream::iter(snapshot.iter_arc(parent_group));
12681276
let res = nodes.and_then(move |node| async move {
12691277
// Note: Confusingly, these NodeSnapshot instances have the metadata stored in the snapshot.
12701278
// We have not applied any changeset updates. At the moment, the downstream code only
@@ -1477,14 +1485,15 @@ pub async fn get_chunk(
14771485

14781486
/// Yields nodes in the base snapshot, applying any relevant updates in the changeset
14791487
async fn updated_existing_nodes<'a>(
1488+
parent_group: &Path,
14801489
asset_manager: &AssetManager,
14811490
change_set: &'a ChangeSet,
14821491
parent_id: &SnapshotId,
1483-
) -> SessionResult<impl Iterator<Item = SessionResult<NodeSnapshot>> + 'a + use<'a>> {
1492+
) -> SessionResult<impl Iterator<Item = SessionResult<NodeSnapshot>> + use<'a>> {
14841493
let updated_nodes = asset_manager
14851494
.fetch_snapshot(parent_id)
14861495
.await?
1487-
.iter_arc()
1496+
.iter_arc(parent_group)
14881497
.filter_map_ok(move |node| change_set.update_existing_node(node))
14891498
.map(|n| match n {
14901499
Ok(n) => Ok(n),
@@ -1497,11 +1506,12 @@ async fn updated_existing_nodes<'a>(
14971506
/// Yields nodes with the snapshot, applying any relevant updates in the changeset,
14981507
/// *and* new nodes in the changeset
14991508
async fn updated_nodes<'a>(
1509+
parent_group: &Path,
15001510
asset_manager: &AssetManager,
15011511
change_set: &'a ChangeSet,
15021512
parent_id: &SnapshotId,
1503-
) -> SessionResult<impl Iterator<Item = SessionResult<NodeSnapshot>> + 'a + use<'a>> {
1504-
Ok(updated_existing_nodes(asset_manager, change_set, parent_id)
1513+
) -> SessionResult<impl Iterator<Item = SessionResult<NodeSnapshot>> + use<'a>> {
1514+
Ok(updated_existing_nodes(parent_group, asset_manager, change_set, parent_id)
15051515
.await?
15061516
.chain(change_set.new_nodes_iterator().map(Ok)))
15071517
}
@@ -1586,7 +1596,8 @@ async fn all_chunks<'a>(
15861596
snapshot_id: &'a SnapshotId,
15871597
) -> SessionResult<impl Stream<Item = SessionResult<(Path, ChunkInfo)>> + 'a> {
15881598
let existing_array_chunks =
1589-
updated_chunk_iterator(asset_manager, change_set, snapshot_id).await?;
1599+
updated_chunk_iterator(&Path::root(), asset_manager, change_set, snapshot_id)
1600+
.await?;
15901601
let new_array_chunks =
15911602
futures::stream::iter(change_set.new_arrays_chunk_iterator().map(Ok));
15921603
Ok(existing_array_chunks.chain(new_array_chunks))
@@ -2014,6 +2025,7 @@ async fn flush(
20142025
// gather and sort nodes:
20152026
// this is a requirement for Snapshot::from_iter
20162027
let mut all_nodes: Vec<_> = updated_nodes(
2028+
&Path::root(),
20172029
flush_data.asset_manager.as_ref(),
20182030
flush_data.change_set,
20192031
flush_data.parent_id,
@@ -3213,15 +3225,15 @@ mod tests {
32133225
ds.add_group(Path::root(), Bytes::copy_from_slice(b"")).await?;
32143226
ds.add_group("/1".try_into().unwrap(), Bytes::copy_from_slice(b"")).await?;
32153227
ds.delete_group("/1".try_into().unwrap()).await?;
3216-
assert_eq!(ds.list_nodes().await?.count(), 1);
3228+
assert_eq!(ds.list_nodes(&Path::root()).await?.count(), 1);
32173229
ds.commit("commit", None).await?;
32183230

32193231
let ds = repository
32203232
.readonly_session(&VersionInfo::BranchTipRef("main".to_string()))
32213233
.await?;
32223234
assert!(ds.get_group(&Path::root()).await.is_ok());
32233235
assert!(ds.get_group(&"/1".try_into().unwrap()).await.is_err());
3224-
assert_eq!(ds.list_nodes().await?.count(), 1);
3236+
assert_eq!(ds.list_nodes(&Path::root()).await?.count(), 1);
32253237
Ok(())
32263238
}
32273239

@@ -3237,7 +3249,7 @@ mod tests {
32373249
ds.delete_group("/1".try_into().unwrap()).await?;
32383250
assert!(ds.get_group(&Path::root()).await.is_ok());
32393251
assert!(ds.get_group(&"/1".try_into().unwrap()).await.is_err());
3240-
assert_eq!(ds.list_nodes().await?.count(), 1);
3252+
assert_eq!(ds.list_nodes(&Path::root()).await?.count(), 1);
32413253
Ok(())
32423254
}
32433255

@@ -3255,7 +3267,7 @@ mod tests {
32553267
let ds = repository
32563268
.readonly_session(&VersionInfo::BranchTipRef("main".to_string()))
32573269
.await?;
3258-
assert_eq!(ds.list_nodes().await?.count(), 0);
3270+
assert_eq!(ds.list_nodes(&Path::root()).await?.count(), 0);
32593271
Ok(())
32603272
}
32613273

0 commit comments

Comments
 (0)