Skip to content

Commit a3bf829

Browse files
ZENOTMEZENOTME
andauthored
refine: add new_manifest_writer in SnapshotProducer (#1481)
## Which issue does this PR close? This PR add new_manifest_writer in SnapshotProducer and this function can be used to create different manifset writer for different action in the future, e.g. MergeAppend #902 ## What changes are included in this PR? ## Are these changes tested? Co-authored-by: ZENOTME <st810918843@gmail.com>
1 parent 36cc120 commit a3bf829

File tree

1 file changed

+27
-21
lines changed

1 file changed

+27
-21
lines changed

crates/iceberg/src/transaction/snapshot.rs

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@ use std::ops::RangeFrom;
2222
use uuid::Uuid;
2323

2424
use crate::error::Result;
25-
use crate::io::OutputFile;
2625
use crate::spec::{
27-
DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestEntry, ManifestFile,
28-
ManifestListWriter, ManifestWriterBuilder, Operation, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT,
29-
PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT, Snapshot, SnapshotReference, SnapshotRetention,
30-
SnapshotSummaryCollector, Struct, StructType, Summary, update_snapshot_summaries,
26+
DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, ManifestEntry,
27+
ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, Operation,
28+
PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT,
29+
Snapshot, SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, StructType,
30+
Summary, update_snapshot_summaries,
3131
};
3232
use crate::table::Table;
3333
use crate::transaction::ActionCommit;
@@ -177,7 +177,11 @@ impl SnapshotProducer {
177177
snapshot_id
178178
}
179179

180-
fn new_manifest_output(&mut self, table: &Table) -> Result<OutputFile> {
180+
fn new_manifest_writer(
181+
&mut self,
182+
content: ManifestContentType,
183+
table: &Table,
184+
) -> Result<ManifestWriter> {
181185
let new_manifest_path = format!(
182186
"{}/{}/{}-m{}.{}",
183187
table.metadata().location(),
@@ -186,7 +190,22 @@ impl SnapshotProducer {
186190
self.manifest_counter.next().unwrap(),
187191
DataFileFormat::Avro
188192
);
189-
table.file_io().new_output(new_manifest_path)
193+
let output_file = table.file_io().new_output(new_manifest_path)?;
194+
let builder = ManifestWriterBuilder::new(
195+
output_file,
196+
Some(self.snapshot_id),
197+
self.key_metadata.clone(),
198+
table.metadata().current_schema().clone(),
199+
table.metadata().default_partition_spec().as_ref().clone(),
200+
);
201+
if table.metadata().format_version() == FormatVersion::V1 {
202+
Ok(builder.build_v1())
203+
} else {
204+
match content {
205+
ManifestContentType::Data => Ok(builder.build_v2_data()),
206+
ManifestContentType::Deletes => Ok(builder.build_v2_deletes()),
207+
}
208+
}
190209
}
191210

192211
// Check if the partition value is compatible with the partition type.
@@ -244,20 +263,7 @@ impl SnapshotProducer {
244263
builder.build()
245264
}
246265
});
247-
let mut writer = {
248-
let builder = ManifestWriterBuilder::new(
249-
self.new_manifest_output(table)?,
250-
Some(self.snapshot_id),
251-
self.key_metadata.clone(),
252-
table.metadata().current_schema().clone(),
253-
table.metadata().default_partition_spec().as_ref().clone(),
254-
);
255-
if table.metadata().format_version() == FormatVersion::V1 {
256-
builder.build_v1()
257-
} else {
258-
builder.build_v2_data()
259-
}
260-
};
266+
let mut writer = self.new_manifest_writer(ManifestContentType::Data, table)?;
261267
for entry in manifest_entries {
262268
writer.add_entry(entry)?;
263269
}

0 commit comments

Comments
 (0)