Skip to content

refine: refine the interface of SnapshotProducer #1490

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions crates/iceberg/src/transaction/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,6 @@ impl FastAppendAction {
#[async_trait]
impl TransactionAction for FastAppendAction {
async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> {
// validate added files
SnapshotProducer::validate_added_data_files(table, &self.added_data_files)?;

// Checks duplicate files
if self.check_duplicate {
SnapshotProducer::validate_duplicate_files(table, &self.added_data_files).await?;
}

let snapshot_producer = SnapshotProducer::new(
table,
self.commit_uuid.unwrap_or_else(Uuid::now_v7),
Expand All @@ -100,8 +92,18 @@ impl TransactionAction for FastAppendAction {
self.added_data_files.clone(),
);

// validate added files
snapshot_producer.validate_added_data_files(&self.added_data_files)?;

// Checks duplicate files
if self.check_duplicate {
snapshot_producer
.validate_duplicate_files(&self.added_data_files)
.await?;
}

snapshot_producer
.commit(table, FastAppendOperation, DefaultManifestProcess)
.commit(FastAppendOperation, DefaultManifestProcess)
.await
}
}
Expand All @@ -115,18 +117,24 @@ impl SnapshotProduceOperation for FastAppendOperation {

async fn delete_entries(
&self,
_snapshot_produce: &SnapshotProducer,
_snapshot_produce: &SnapshotProducer<'_>,
) -> Result<Vec<ManifestEntry>> {
Ok(vec![])
}

async fn existing_manifest(&self, table: &Table) -> Result<Vec<ManifestFile>> {
let Some(snapshot) = table.metadata().current_snapshot() else {
async fn existing_manifest(
&self,
snapshot_produce: &SnapshotProducer<'_>,
) -> Result<Vec<ManifestFile>> {
let Some(snapshot) = snapshot_produce.table.metadata().current_snapshot() else {
return Ok(vec![]);
};

let manifest_list = snapshot
.load_manifest_list(table.file_io(), &table.metadata_ref())
.load_manifest_list(
snapshot_produce.table.file_io(),
&snapshot_produce.table.metadata_ref(),
)
.await?;

Ok(manifest_list
Expand Down
119 changes: 63 additions & 56 deletions crates/iceberg/src/transaction/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,32 @@ pub(crate) trait SnapshotProduceOperation: Send + Sync {
) -> impl Future<Output = Result<Vec<ManifestEntry>>> + Send;
fn existing_manifest(
&self,
table: &Table,
snapshot_produce: &SnapshotProducer<'_>,
) -> impl Future<Output = Result<Vec<ManifestFile>>> + Send;
}

pub(crate) struct DefaultManifestProcess;

impl ManifestProcess for DefaultManifestProcess {
fn process_manifests(&self, manifests: Vec<ManifestFile>) -> Vec<ManifestFile> {
fn process_manifests(
&self,
_snapshot_produce: &SnapshotProducer<'_>,
manifests: Vec<ManifestFile>,
) -> Vec<ManifestFile> {
manifests
}
}

pub(crate) trait ManifestProcess: Send + Sync {
fn process_manifests(&self, manifests: Vec<ManifestFile>) -> Vec<ManifestFile>;
fn process_manifests(
&self,
snapshot_produce: &SnapshotProducer<'_>,
manifests: Vec<ManifestFile>,
) -> Vec<ManifestFile>;
}

pub(crate) struct SnapshotProducer {
pub(crate) struct SnapshotProducer<'a> {
pub(crate) table: &'a Table,
snapshot_id: i64,
commit_uuid: Uuid,
key_metadata: Option<Vec<u8>>,
Expand All @@ -72,15 +81,16 @@ pub(crate) struct SnapshotProducer {
manifest_counter: RangeFrom<u64>,
}

impl SnapshotProducer {
impl<'a> SnapshotProducer<'a> {
pub(crate) fn new(
table: &Table,
table: &'a Table,
commit_uuid: Uuid,
key_metadata: Option<Vec<u8>>,
snapshot_properties: HashMap<String, String>,
added_data_files: Vec<DataFile>,
) -> Self {
Self {
table,
snapshot_id: Self::generate_unique_snapshot_id(table),
commit_uuid,
key_metadata,
Expand All @@ -90,10 +100,7 @@ impl SnapshotProducer {
}
}

pub(crate) fn validate_added_data_files(
table: &Table,
added_data_files: &[DataFile],
) -> Result<()> {
pub(crate) fn validate_added_data_files(&self, added_data_files: &[DataFile]) -> Result<()> {
for data_file in added_data_files {
if data_file.content_type() != crate::spec::DataContentType::Data {
return Err(Error::new(
Expand All @@ -102,23 +109,23 @@ impl SnapshotProducer {
));
}
// Check if the data file partition spec id matches the table default partition spec id.
if table.metadata().default_partition_spec_id() != data_file.partition_spec_id {
if self.table.metadata().default_partition_spec_id() != data_file.partition_spec_id {
return Err(Error::new(
ErrorKind::DataInvalid,
"Data file partition spec id does not match table default partition spec id",
));
}
Self::validate_partition_value(
data_file.partition(),
table.metadata().default_partition_type(),
self.table.metadata().default_partition_type(),
)?;
}

Ok(())
}

pub(crate) async fn validate_duplicate_files(
table: &Table,
&self,
added_data_files: &[DataFile],
) -> Result<()> {
let new_files: HashSet<&str> = added_data_files
Expand All @@ -127,12 +134,14 @@ impl SnapshotProducer {
.collect();

let mut referenced_files = Vec::new();
if let Some(current_snapshot) = table.metadata().current_snapshot() {
if let Some(current_snapshot) = self.table.metadata().current_snapshot() {
let manifest_list = current_snapshot
.load_manifest_list(table.file_io(), &table.metadata_ref())
.load_manifest_list(self.table.file_io(), &self.table.metadata_ref())
.await?;
for manifest_list_entry in manifest_list.entries() {
let manifest = manifest_list_entry.load_manifest(table.file_io()).await?;
let manifest = manifest_list_entry
.load_manifest(self.table.file_io())
.await?;
for entry in manifest.entries() {
let file_path = entry.file_path();
if new_files.contains(file_path) && entry.is_alive() {
Expand Down Expand Up @@ -177,28 +186,28 @@ impl SnapshotProducer {
snapshot_id
}

fn new_manifest_writer(
&mut self,
content: ManifestContentType,
table: &Table,
) -> Result<ManifestWriter> {
fn new_manifest_writer(&mut self, content: ManifestContentType) -> Result<ManifestWriter> {
let new_manifest_path = format!(
"{}/{}/{}-m{}.{}",
table.metadata().location(),
self.table.metadata().location(),
META_ROOT_PATH,
self.commit_uuid,
self.manifest_counter.next().unwrap(),
DataFileFormat::Avro
);
let output_file = table.file_io().new_output(new_manifest_path)?;
let output_file = self.table.file_io().new_output(new_manifest_path)?;
let builder = ManifestWriterBuilder::new(
output_file,
Some(self.snapshot_id),
self.key_metadata.clone(),
table.metadata().current_schema().clone(),
table.metadata().default_partition_spec().as_ref().clone(),
self.table.metadata().current_schema().clone(),
self.table
.metadata()
.default_partition_spec()
.as_ref()
.clone(),
);
if table.metadata().format_version() == FormatVersion::V1 {
if self.table.metadata().format_version() == FormatVersion::V1 {
Ok(builder.build_v1())
} else {
match content {
Expand Down Expand Up @@ -240,7 +249,7 @@ impl SnapshotProducer {
}

// Write manifest file for added data files and return the ManifestFile for ManifestList.
async fn write_added_manifest(&mut self, table: &Table) -> Result<ManifestFile> {
async fn write_added_manifest(&mut self) -> Result<ManifestFile> {
let added_data_files = std::mem::take(&mut self.added_data_files);
if added_data_files.is_empty() {
return Err(Error::new(
Expand All @@ -250,7 +259,7 @@ impl SnapshotProducer {
}

let snapshot_id = self.snapshot_id;
let format_version = table.metadata().format_version();
let format_version = self.table.metadata().format_version();
let manifest_entries = added_data_files.into_iter().map(|data_file| {
let builder = ManifestEntry::builder()
.status(crate::spec::ManifestStatus::Added)
Expand All @@ -263,7 +272,7 @@ impl SnapshotProducer {
builder.build()
}
});
let mut writer = self.new_manifest_writer(ManifestContentType::Data, table)?;
let mut writer = self.new_manifest_writer(ManifestContentType::Data)?;
for entry in manifest_entries {
writer.add_entry(entry)?;
}
Expand All @@ -272,29 +281,27 @@ impl SnapshotProducer {

async fn manifest_file<OP: SnapshotProduceOperation, MP: ManifestProcess>(
&mut self,
table: &Table,
snapshot_produce_operation: &OP,
manifest_process: &MP,
) -> Result<Vec<ManifestFile>> {
let added_manifest = self.write_added_manifest(table).await?;
let existing_manifests = snapshot_produce_operation.existing_manifest(table).await?;
let added_manifest = self.write_added_manifest().await?;
let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?;
// # TODO
// Support process delete entries.

let mut manifest_files = vec![added_manifest];
manifest_files.extend(existing_manifests);
let manifest_files = manifest_process.process_manifests(manifest_files);
let manifest_files = manifest_process.process_manifests(self, manifest_files);
Ok(manifest_files)
}

// Returns a `Summary` of the current snapshot
fn summary<OP: SnapshotProduceOperation>(
&self,
table: &Table,
snapshot_produce_operation: &OP,
) -> Result<Summary> {
let mut summary_collector = SnapshotSummaryCollector::default();
let table_metadata = table.metadata_ref();
let table_metadata = self.table.metadata_ref();

let partition_summary_limit = if let Some(limit) = table_metadata
.properties()
Expand Down Expand Up @@ -339,10 +346,10 @@ impl SnapshotProducer {
)
}

fn generate_manifest_list_file_path(&self, table: &Table, attempt: i64) -> String {
fn generate_manifest_list_file_path(&self, attempt: i64) -> String {
format!(
"{}/{}/snap-{}-{}-{}.{}",
table.metadata().location(),
self.table.metadata().location(),
META_ROOT_PATH,
self.snapshot_id,
attempt,
Expand All @@ -354,34 +361,34 @@ impl SnapshotProducer {
/// Finished building the action and return the [`ActionCommit`] to the transaction.
pub(crate) async fn commit<OP: SnapshotProduceOperation, MP: ManifestProcess>(
mut self,
table: &Table,
snapshot_produce_operation: OP,
process: MP,
) -> Result<ActionCommit> {
let new_manifests = self
.manifest_file(table, &snapshot_produce_operation, &process)
.manifest_file(&snapshot_produce_operation, &process)
.await?;
let next_seq_num = table.metadata().next_sequence_number();
let next_seq_num = self.table.metadata().next_sequence_number();

let summary = self
.summary(table, &snapshot_produce_operation)
.map_err(|err| {
Error::new(ErrorKind::Unexpected, "Failed to create snapshot summary.")
.with_source(err)
})?;
let summary = self.summary(&snapshot_produce_operation).map_err(|err| {
Error::new(ErrorKind::Unexpected, "Failed to create snapshot summary.").with_source(err)
})?;

let manifest_list_path = self.generate_manifest_list_file_path(table, 0);
let manifest_list_path = self.generate_manifest_list_file_path(0);

let mut manifest_list_writer = match table.metadata().format_version() {
let mut manifest_list_writer = match self.table.metadata().format_version() {
FormatVersion::V1 => ManifestListWriter::v1(
table.file_io().new_output(manifest_list_path.clone())?,
self.table
.file_io()
.new_output(manifest_list_path.clone())?,
self.snapshot_id,
table.metadata().current_snapshot_id(),
self.table.metadata().current_snapshot_id(),
),
FormatVersion::V2 => ManifestListWriter::v2(
table.file_io().new_output(manifest_list_path.clone())?,
self.table
.file_io()
.new_output(manifest_list_path.clone())?,
self.snapshot_id,
table.metadata().current_snapshot_id(),
self.table.metadata().current_snapshot_id(),
next_seq_num,
),
};
Expand All @@ -392,10 +399,10 @@ impl SnapshotProducer {
let new_snapshot = Snapshot::builder()
.with_manifest_list(manifest_list_path)
.with_snapshot_id(self.snapshot_id)
.with_parent_snapshot_id(table.metadata().current_snapshot_id())
.with_parent_snapshot_id(self.table.metadata().current_snapshot_id())
.with_sequence_number(next_seq_num)
.with_summary(summary)
.with_schema_id(table.metadata().current_schema_id())
.with_schema_id(self.table.metadata().current_schema_id())
.with_timestamp_ms(commit_ts)
.build();

Expand All @@ -414,11 +421,11 @@ impl SnapshotProducer {

let requirements = vec![
TableRequirement::UuidMatch {
uuid: table.metadata().uuid(),
uuid: self.table.metadata().uuid(),
},
TableRequirement::RefSnapshotIdMatch {
r#ref: MAIN_BRANCH.to_string(),
snapshot_id: table.metadata().current_snapshot_id(),
snapshot_id: self.table.metadata().current_snapshot_id(),
},
];

Expand Down
Loading