Skip to content
Merged
19 changes: 14 additions & 5 deletions icechunk-python/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,18 +553,27 @@ impl PyRepository {
#[pyo3(signature = (metadata))]
pub fn set_default_commit_metadata(
&self,
py: Python<'_>,
metadata: Option<PySnapshotProperties>,
) -> PyResult<()> {
let metadata = metadata.map(|m| m.into());
pyo3_async_runtimes::tokio::get_runtime().block_on(async move {
self.0.set_default_commit_metadata(metadata).await;
py.allow_threads(move || {
self.0
.set_default_commit_metadata(metadata)
.map_err(PyIcechunkStoreError::RepositoryError)?;
Ok(())
})
}

pub fn default_commit_metadata(&self) -> PyResult<Option<PySnapshotProperties>> {
pyo3_async_runtimes::tokio::get_runtime().block_on(async move {
let metadata = self.0.default_commit_metadata().await;
pub fn default_commit_metadata(
&self,
py: Python<'_>,
) -> PyResult<Option<PySnapshotProperties>> {
py.allow_threads(move || {
let metadata = self
.0
.default_commit_metadata()
.map_err(PyIcechunkStoreError::RepositoryError)?;
Ok(metadata.map(|m| m.into()))
})
}
Expand Down
6 changes: 5 additions & 1 deletion icechunk-python/tests/test_pickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@


def create_local_repo(path: str) -> Repository:
return Repository.create(storage=local_filesystem_storage(path))
repo = Repository.create(storage=local_filesystem_storage(path))
repo.set_default_commit_metadata({"author": "test"})
return repo


@pytest.fixture(scope="function")
Expand All @@ -29,6 +31,8 @@ def test_pickle_repository(tmpdir: Path, tmp_repo: Repository) -> None:
pickled = pickle.dumps(tmp_repo)
roundtripped = pickle.loads(pickled)
assert tmp_repo.list_branches() == roundtripped.list_branches()
assert tmp_repo.default_commit_metadata() == roundtripped.default_commit_metadata()
assert tmp_repo.default_commit_metadata() == {"author": "test"}

storage = tmp_repo.storage
assert (
Expand Down
36 changes: 24 additions & 12 deletions icechunk/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
collections::{BTreeSet, HashMap, HashSet},
future::ready,
ops::RangeBounds,
sync::Arc,
sync::{Arc, Mutex},
};

use async_recursion::async_recursion;
Expand All @@ -16,7 +16,6 @@ use futures::{
use regex::bytes::Regex;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::sync::RwLock;
use tokio::task::JoinError;
use tracing::{Instrument, debug, error, instrument, trace};

Expand Down Expand Up @@ -60,7 +59,8 @@ pub enum RepositoryErrorKind {
FormatError(IcechunkFormatErrorKind),
#[error(transparent)]
Ref(RefErrorKind),

#[error("failed to obtain mutex lock")]
SyncError,
#[error("snapshot not found: `{id}`")]
SnapshotNotFound { id: SnapshotId },
#[error("branch {branch} does not have a snapshots before or at {at}")]
Expand Down Expand Up @@ -139,8 +139,7 @@ pub struct Repository {
asset_manager: Arc<AssetManager>,
virtual_resolver: Arc<VirtualChunkResolver>,
virtual_chunk_credentials: HashMap<ContainerName, Credentials>,
#[serde(skip)]
default_commit_metadata: Arc<RwLock<Option<SnapshotProperties>>>,
default_commit_metadata: Arc<Mutex<Option<SnapshotProperties>>>,
Copy link
Contributor

@dcherian dcherian Mar 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Mutex over RwLock here? I guess in general we don't expect multiple readers here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah exactly. And here we can't use the async versions because of serialization. I can use rwlock if @paraseba would prefer tho

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need locking? Can't the set_default_commit_metadata just take a &mut self? I think that would be the cleanest option, then PyRepository is the one that needs locking to call this method on the repo, which seems also right.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we dont currently lock PyRepository at all and its a big change to introduce it. We use internal mutability everywhere

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what i'm proposing is maybe a smaller change? Repository today doesn't have internal mutability (which I like), it can easily be used from Rust in "the usual" way. But things like PySession hold a lock to the underlying Rust datastructure. What I'm proposing is we do the same for PyRepository, bringing these two to the same style:

pub struct PyRepository(Arc<Repository>);
pub struct PySession(pub Arc<RwLock<Session>>);

I may be missing something that would make this hard...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry thats what i meant. I was trying to avoid having to lock the Repository but i can do if we want it

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can merge and I can give it a try tomorrow if you are busy. But, i'd prefer if we don't introduce internal mutability into the Repo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i can do it in this pr then! Thanks for the feedback

}

impl Repository {
Expand Down Expand Up @@ -312,7 +311,7 @@ impl Repository {
virtual_resolver,
asset_manager,
virtual_chunk_credentials,
default_commit_metadata: Arc::new(RwLock::new(None)),
default_commit_metadata: Arc::new(Mutex::new(None)),
})
}

Expand Down Expand Up @@ -382,17 +381,27 @@ impl Repository {
}

#[instrument(skip_all)]
pub async fn set_default_commit_metadata(
pub fn set_default_commit_metadata(
&self,
metadata: Option<SnapshotProperties>,
) {
let mut guard = self.default_commit_metadata.write().await;
) -> RepositoryResult<()> {
let mut guard = self
.default_commit_metadata
.lock()
.map_err(|_| RepositoryErrorKind::SyncError)?;
*guard = metadata;
Ok(())
}

#[instrument(skip_all)]
pub async fn default_commit_metadata(&self) -> Option<SnapshotProperties> {
self.default_commit_metadata.read().await.clone()
pub fn default_commit_metadata(
&self,
) -> RepositoryResult<Option<SnapshotProperties>> {
let guard = self
.default_commit_metadata
.lock()
.map_err(|_| RepositoryErrorKind::SyncError)?;
Ok(guard.clone())
}

#[instrument(skip(storage, config))]
Expand Down Expand Up @@ -774,7 +783,10 @@ impl Repository {
self.virtual_resolver.clone(),
branch.to_string(),
ref_data.snapshot.clone(),
self.default_commit_metadata.read().await.clone(),
self.default_commit_metadata
.lock()
.map_err(|_| RepositoryErrorKind::SyncError)?
.clone(),
);

self.preload_manifests(ref_data.snapshot);
Expand Down
2 changes: 1 addition & 1 deletion icechunk/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1983,7 +1983,7 @@ mod tests {
let mut default_metadata = SnapshotProperties::default();
default_metadata.insert("author".to_string(), "John Doe".to_string().into());
default_metadata.insert("project".to_string(), "My Project".to_string().into());
repo.set_default_commit_metadata(Some(default_metadata.clone())).await;
repo.set_default_commit_metadata(Some(default_metadata.clone()))?;

let mut ds = repo.writable_session("main").await?;
ds.add_group("/group".try_into().unwrap(), Bytes::new()).await?;
Expand Down