Skip to content

Commit 3374ca4

Browse files
authored
Add optional default commit metadata to Repository (#860)
1 parent 0c41b9c commit 3374ca4

File tree

7 files changed

+162
-2
lines changed

7 files changed

+162
-2
lines changed

icechunk-python/python/icechunk/_icechunk_python.pyi

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -964,6 +964,10 @@ class PyRepository:
964964
def save_config(self) -> None: ...
965965
def config(self) -> RepositoryConfig: ...
966966
def storage(self) -> Storage: ...
967+
def set_default_commit_metadata(
968+
self, metadata: dict[str, Any] | None = None
969+
) -> None: ...
970+
def default_commit_metadata(self) -> dict[str, Any] | None: ...
967971
def async_ancestry(
968972
self,
969973
*,

icechunk-python/python/icechunk/repository.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import datetime
22
from collections.abc import AsyncIterator, Iterator
3-
from typing import Self, cast
3+
from typing import Any, Self, cast
44

55
from icechunk._icechunk_python import (
66
Diff,
@@ -216,6 +216,36 @@ def storage(self) -> Storage:
216216
"""
217217
return self._repository.storage()
218218

219+
def set_default_commit_metadata(self, metadata: dict[str, Any] | None = None) -> None:
220+
"""
221+
Set the default commit metadata for the repository. This is useful for providing
222+
addition static system conexted metadata to all commits.
223+
224+
When a commit is made, the metadata will be merged with the metadata provided, with any
225+
duplicate keys being overwritten by the metadata provided in the commit.
226+
227+
!!! warning
228+
This metadata is only applied to sessions that are created after this call. Any open
229+
writable sessions will not be affected and will not use the new default metadata.
230+
231+
Parameters
232+
----------
233+
metadata : dict[str, Any], optional
234+
The default commit metadata.
235+
"""
236+
return self._repository.set_default_commit_metadata(metadata)
237+
238+
def default_commit_metadata(self) -> dict[str, Any] | None:
239+
"""
240+
Get the current configured default commit metadata for the repository.
241+
242+
Returns
243+
-------
244+
dict[str, Any] | None
245+
The default commit metadata.
246+
"""
247+
return self._repository.default_commit_metadata()
248+
219249
def ancestry(
220250
self,
221251
*,

icechunk-python/src/repository.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -550,6 +550,25 @@ impl PyRepository {
550550
PyStorage(Arc::clone(self.0.storage()))
551551
}
552552

553+
#[pyo3(signature = (metadata))]
554+
pub fn set_default_commit_metadata(
555+
&self,
556+
metadata: Option<PySnapshotProperties>,
557+
) -> PyResult<()> {
558+
let metadata = metadata.map(|m| m.into());
559+
pyo3_async_runtimes::tokio::get_runtime().block_on(async move {
560+
self.0.set_default_commit_metadata(metadata).await;
561+
Ok(())
562+
})
563+
}
564+
565+
pub fn default_commit_metadata(&self) -> PyResult<Option<PySnapshotProperties>> {
566+
pyo3_async_runtimes::tokio::get_runtime().block_on(async move {
567+
let metadata = self.0.default_commit_metadata().await;
568+
Ok(metadata.map(|m| m.into()))
569+
})
570+
}
571+
553572
/// Returns an object that is both a sync and an async iterator
554573
#[pyo3(signature = (*, branch = None, tag = None, snapshot_id = None))]
555574
pub fn async_ancestry(

icechunk-python/tests/test_timetravel.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,3 +293,17 @@ async def test_session_with_as_of() -> None:
293293
expected_children = {f"child {j}" for j in range(i)}
294294
actual_children = {g[0] for g in group.members()}
295295
assert expected_children == actual_children
296+
297+
298+
async def test_default_commit_metadata() -> None:
299+
repo = ic.Repository.create(
300+
storage=ic.in_memory_storage(),
301+
)
302+
303+
repo.set_default_commit_metadata({"user": "test"})
304+
session = repo.writable_session("main")
305+
root = zarr.group(store=session.store, overwrite=True)
306+
root.create_group("child")
307+
sid = session.commit("root")
308+
snap = next(repo.ancestry(snapshot_id=sid))
309+
assert snap.metadata == {"user": "test"}

icechunk/src/asset_manager.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,16 @@ impl AssetManager {
323323
.get_snapshot_last_modified(&self.storage_settings, snapshot_id)
324324
.await?)
325325
}
326+
327+
#[instrument(skip(self))]
328+
pub async fn fetch_snapshot_info(
329+
&self,
330+
snapshot_id: &SnapshotId,
331+
) -> RepositoryResult<SnapshotInfo> {
332+
let snapshot = self.fetch_snapshot(snapshot_id).await?;
333+
let info = snapshot.as_ref().try_into()?;
334+
Ok(info)
335+
}
326336
}
327337

328338
fn binary_file_header(

icechunk/src/repository.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use futures::{
1616
use regex::bytes::Regex;
1717
use serde::{Deserialize, Serialize};
1818
use thiserror::Error;
19+
use tokio::sync::RwLock;
1920
use tokio::task::JoinError;
2021
use tracing::{Instrument, debug, error, instrument, trace};
2122

@@ -27,7 +28,9 @@ use crate::{
2728
format::{
2829
IcechunkFormatError, IcechunkFormatErrorKind, ManifestId, NodeId, Path,
2930
SnapshotId,
30-
snapshot::{ManifestFileInfo, NodeData, Snapshot, SnapshotInfo},
31+
snapshot::{
32+
ManifestFileInfo, NodeData, Snapshot, SnapshotInfo, SnapshotProperties,
33+
},
3134
transaction_log::{Diff, DiffBuilder},
3235
},
3336
refs::{
@@ -136,6 +139,8 @@ pub struct Repository {
136139
asset_manager: Arc<AssetManager>,
137140
virtual_resolver: Arc<VirtualChunkResolver>,
138141
virtual_chunk_credentials: HashMap<ContainerName, Credentials>,
142+
#[serde(skip)]
143+
default_commit_metadata: Arc<RwLock<Option<SnapshotProperties>>>,
139144
}
140145

141146
impl Repository {
@@ -307,6 +312,7 @@ impl Repository {
307312
virtual_resolver,
308313
asset_manager,
309314
virtual_chunk_credentials,
315+
default_commit_metadata: Arc::new(RwLock::new(None)),
310316
})
311317
}
312318

@@ -375,6 +381,20 @@ impl Repository {
375381
.await
376382
}
377383

384+
#[instrument(skip_all)]
385+
pub async fn set_default_commit_metadata(
386+
&self,
387+
metadata: Option<SnapshotProperties>,
388+
) {
389+
let mut guard = self.default_commit_metadata.write().await;
390+
*guard = metadata;
391+
}
392+
393+
#[instrument(skip_all)]
394+
pub async fn default_commit_metadata(&self) -> Option<SnapshotProperties> {
395+
self.default_commit_metadata.read().await.clone()
396+
}
397+
378398
#[instrument(skip(storage, config))]
379399
pub(crate) async fn store_config(
380400
storage: &(dyn Storage + Send + Sync),
@@ -754,6 +774,7 @@ impl Repository {
754774
self.virtual_resolver.clone(),
755775
branch.to_string(),
756776
ref_data.snapshot.clone(),
777+
self.default_commit_metadata.read().await.clone(),
757778
);
758779

759780
self.preload_manifests(ref_data.snapshot);

icechunk/src/session.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ pub struct Session {
166166
branch_name: Option<String>,
167167
snapshot_id: SnapshotId,
168168
change_set: ChangeSet,
169+
default_commit_metadata: Option<SnapshotProperties>,
169170
}
170171

171172
impl Session {
@@ -186,9 +187,11 @@ impl Session {
186187
branch_name: None,
187188
snapshot_id,
188189
change_set: ChangeSet::default(),
190+
default_commit_metadata: None,
189191
}
190192
}
191193

194+
#[allow(clippy::too_many_arguments)]
192195
pub fn create_writable_session(
193196
config: RepositoryConfig,
194197
storage_settings: storage::Settings,
@@ -197,6 +200,7 @@ impl Session {
197200
virtual_resolver: Arc<VirtualChunkResolver>,
198201
branch_name: String,
199202
snapshot_id: SnapshotId,
203+
default_commit_metadata: Option<SnapshotProperties>,
200204
) -> Self {
201205
Self {
202206
config,
@@ -207,6 +211,7 @@ impl Session {
207211
branch_name: Some(branch_name),
208212
snapshot_id,
209213
change_set: ChangeSet::default(),
214+
default_commit_metadata,
210215
}
211216
}
212217

@@ -806,6 +811,17 @@ impl Session {
806811
return Err(SessionErrorKind::ReadOnlySession.into());
807812
};
808813

814+
let properties = match (properties, self.default_commit_metadata.as_ref()) {
815+
(Some(p), None) => Some(p),
816+
(None, Some(d)) => Some(d.clone()),
817+
(Some(p), Some(d)) => {
818+
let mut merged = d.clone();
819+
merged.extend(p.into_iter());
820+
Some(merged)
821+
}
822+
(None, None) => None,
823+
};
824+
809825
let current = fetch_branch_tip(
810826
self.storage.as_ref(),
811827
self.storage_settings.as_ref(),
@@ -1950,6 +1966,52 @@ mod tests {
19501966
prop_assert_eq!(to, expected_to);
19511967
}
19521968

1969+
#[tokio::test(flavor = "multi_thread")]
1970+
async fn test_repository_with_default_commit_metadata() -> Result<(), Box<dyn Error>>
1971+
{
1972+
let repo = create_memory_store_repository().await;
1973+
let mut ds = repo.writable_session("main").await?;
1974+
ds.add_group(Path::root(), Bytes::new()).await?;
1975+
let snapshot = ds.commit("commit", None).await?;
1976+
1977+
// Verify that the first commit has no metadata
1978+
let ancestry = repo.snapshot_ancestry(&snapshot).await?;
1979+
let snapshot_infos = ancestry.try_collect::<Vec<_>>().await?;
1980+
assert!(snapshot_infos[0].metadata.is_empty());
1981+
1982+
// Set some default metadata
1983+
let mut default_metadata = SnapshotProperties::default();
1984+
default_metadata.insert("author".to_string(), "John Doe".to_string().into());
1985+
default_metadata.insert("project".to_string(), "My Project".to_string().into());
1986+
repo.set_default_commit_metadata(Some(default_metadata.clone())).await;
1987+
1988+
let mut ds = repo.writable_session("main").await?;
1989+
ds.add_group("/group".try_into().unwrap(), Bytes::new()).await?;
1990+
let snapshot = ds.commit("commit", None).await?;
1991+
1992+
let snapshot_info = repo.snapshot_ancestry(&snapshot).await?;
1993+
let snapshot_infos = snapshot_info.try_collect::<Vec<_>>().await?;
1994+
assert_eq!(snapshot_infos[0].metadata, default_metadata);
1995+
1996+
// Check that metadata is merged with users provided metadata taking precedence
1997+
let mut metadata = SnapshotProperties::default();
1998+
metadata.insert("author".to_string(), "Jane Doe".to_string().into());
1999+
metadata.insert("id".to_string(), "ideded".to_string().into());
2000+
let mut ds = repo.writable_session("main").await?;
2001+
ds.add_group("/group2".try_into().unwrap(), Bytes::new()).await?;
2002+
let snapshot = ds.commit("commit", Some(metadata.clone())).await?;
2003+
2004+
let snapshot_info = repo.snapshot_ancestry(&snapshot).await?;
2005+
let snapshot_infos = snapshot_info.try_collect::<Vec<_>>().await?;
2006+
let mut expected_result = SnapshotProperties::default();
2007+
expected_result.insert("author".to_string(), "Jane Doe".to_string().into());
2008+
expected_result.insert("project".to_string(), "My Project".to_string().into());
2009+
expected_result.insert("id".to_string(), "ideded".to_string().into());
2010+
assert_eq!(snapshot_infos[0].metadata, expected_result);
2011+
2012+
Ok(())
2013+
}
2014+
19532015
#[tokio::test(flavor = "multi_thread")]
19542016
async fn test_repository_with_updates() -> Result<(), Box<dyn Error>> {
19552017
let storage: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;

0 commit comments

Comments
 (0)