Skip to content

Commit fd1ac4f

Browse files
author
ZENOTME
committed
refine the interface of SnapshotProducer
1 parent d9fbc5c commit fd1ac4f

File tree

2 files changed

+65
-51
lines changed

2 files changed

+65
-51
lines changed

crates/iceberg/src/transaction/append.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ impl TransactionAction for FastAppendAction {
101101
);
102102

103103
snapshot_producer
104-
.commit(table, FastAppendOperation, DefaultManifestProcess)
104+
.commit(FastAppendOperation, DefaultManifestProcess)
105105
.await
106106
}
107107
}
@@ -115,18 +115,24 @@ impl SnapshotProduceOperation for FastAppendOperation {
115115

116116
async fn delete_entries(
117117
&self,
118-
_snapshot_produce: &SnapshotProducer,
118+
_snapshot_produce: &SnapshotProducer<'_>,
119119
) -> Result<Vec<ManifestEntry>> {
120120
Ok(vec![])
121121
}
122122

123-
async fn existing_manifest(&self, table: &Table) -> Result<Vec<ManifestFile>> {
124-
let Some(snapshot) = table.metadata().current_snapshot() else {
123+
async fn existing_manifest(
124+
&self,
125+
snapshot_produce: &SnapshotProducer<'_>,
126+
) -> Result<Vec<ManifestFile>> {
127+
let Some(snapshot) = snapshot_produce.table.metadata().current_snapshot() else {
125128
return Ok(vec![]);
126129
};
127130

128131
let manifest_list = snapshot
129-
.load_manifest_list(table.file_io(), &table.metadata_ref())
132+
.load_manifest_list(
133+
snapshot_produce.table.file_io(),
134+
&snapshot_produce.table.metadata_ref(),
135+
)
130136
.await?;
131137

132138
Ok(manifest_list

crates/iceberg/src/transaction/snapshot.rs

Lines changed: 54 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -44,23 +44,32 @@ pub(crate) trait SnapshotProduceOperation: Send + Sync {
4444
) -> impl Future<Output = Result<Vec<ManifestEntry>>> + Send;
4545
fn existing_manifest(
4646
&self,
47-
table: &Table,
47+
snapshot_produce: &SnapshotProducer<'_>,
4848
) -> impl Future<Output = Result<Vec<ManifestFile>>> + Send;
4949
}
5050

5151
pub(crate) struct DefaultManifestProcess;
5252

5353
impl ManifestProcess for DefaultManifestProcess {
54-
fn process_manifests(&self, manifests: Vec<ManifestFile>) -> Vec<ManifestFile> {
54+
fn process_manifests(
55+
&self,
56+
_snapshot_produce: &SnapshotProducer<'_>,
57+
manifests: Vec<ManifestFile>,
58+
) -> Vec<ManifestFile> {
5559
manifests
5660
}
5761
}
5862

5963
pub(crate) trait ManifestProcess: Send + Sync {
60-
fn process_manifests(&self, manifests: Vec<ManifestFile>) -> Vec<ManifestFile>;
64+
fn process_manifests(
65+
&self,
66+
snapshot_produce: &SnapshotProducer<'_>,
67+
manifests: Vec<ManifestFile>,
68+
) -> Vec<ManifestFile>;
6169
}
6270

63-
pub(crate) struct SnapshotProducer {
71+
pub(crate) struct SnapshotProducer<'a> {
72+
pub(crate) table: &'a Table,
6473
snapshot_id: i64,
6574
commit_uuid: Uuid,
6675
key_metadata: Option<Vec<u8>>,
@@ -72,15 +81,16 @@ pub(crate) struct SnapshotProducer {
7281
manifest_counter: RangeFrom<u64>,
7382
}
7483

75-
impl SnapshotProducer {
84+
impl<'a> SnapshotProducer<'a> {
7685
pub(crate) fn new(
77-
table: &Table,
86+
table: &'a Table,
7887
commit_uuid: Uuid,
7988
key_metadata: Option<Vec<u8>>,
8089
snapshot_properties: HashMap<String, String>,
8190
added_data_files: Vec<DataFile>,
8291
) -> Self {
8392
Self {
93+
table,
8494
snapshot_id: Self::generate_unique_snapshot_id(table),
8595
commit_uuid,
8696
key_metadata,
@@ -177,28 +187,28 @@ impl SnapshotProducer {
177187
snapshot_id
178188
}
179189

180-
fn new_manifest_writer(
181-
&mut self,
182-
content: ManifestContentType,
183-
table: &Table,
184-
) -> Result<ManifestWriter> {
190+
fn new_manifest_writer(&mut self, content: ManifestContentType) -> Result<ManifestWriter> {
185191
let new_manifest_path = format!(
186192
"{}/{}/{}-m{}.{}",
187-
table.metadata().location(),
193+
self.table.metadata().location(),
188194
META_ROOT_PATH,
189195
self.commit_uuid,
190196
self.manifest_counter.next().unwrap(),
191197
DataFileFormat::Avro
192198
);
193-
let output_file = table.file_io().new_output(new_manifest_path)?;
199+
let output_file = self.table.file_io().new_output(new_manifest_path)?;
194200
let builder = ManifestWriterBuilder::new(
195201
output_file,
196202
Some(self.snapshot_id),
197203
self.key_metadata.clone(),
198-
table.metadata().current_schema().clone(),
199-
table.metadata().default_partition_spec().as_ref().clone(),
204+
self.table.metadata().current_schema().clone(),
205+
self.table
206+
.metadata()
207+
.default_partition_spec()
208+
.as_ref()
209+
.clone(),
200210
);
201-
if table.metadata().format_version() == FormatVersion::V1 {
211+
if self.table.metadata().format_version() == FormatVersion::V1 {
202212
Ok(builder.build_v1())
203213
} else {
204214
match content {
@@ -240,7 +250,7 @@ impl SnapshotProducer {
240250
}
241251

242252
// Write manifest file for added data files and return the ManifestFile for ManifestList.
243-
async fn write_added_manifest(&mut self, table: &Table) -> Result<ManifestFile> {
253+
async fn write_added_manifest(&mut self) -> Result<ManifestFile> {
244254
let added_data_files = std::mem::take(&mut self.added_data_files);
245255
if added_data_files.is_empty() {
246256
return Err(Error::new(
@@ -250,7 +260,7 @@ impl SnapshotProducer {
250260
}
251261

252262
let snapshot_id = self.snapshot_id;
253-
let format_version = table.metadata().format_version();
263+
let format_version = self.table.metadata().format_version();
254264
let manifest_entries = added_data_files.into_iter().map(|data_file| {
255265
let builder = ManifestEntry::builder()
256266
.status(crate::spec::ManifestStatus::Added)
@@ -263,7 +273,7 @@ impl SnapshotProducer {
263273
builder.build()
264274
}
265275
});
266-
let mut writer = self.new_manifest_writer(ManifestContentType::Data, table)?;
276+
let mut writer = self.new_manifest_writer(ManifestContentType::Data)?;
267277
for entry in manifest_entries {
268278
writer.add_entry(entry)?;
269279
}
@@ -272,29 +282,27 @@ impl SnapshotProducer {
272282

273283
async fn manifest_file<OP: SnapshotProduceOperation, MP: ManifestProcess>(
274284
&mut self,
275-
table: &Table,
276285
snapshot_produce_operation: &OP,
277286
manifest_process: &MP,
278287
) -> Result<Vec<ManifestFile>> {
279-
let added_manifest = self.write_added_manifest(table).await?;
280-
let existing_manifests = snapshot_produce_operation.existing_manifest(table).await?;
288+
let added_manifest = self.write_added_manifest().await?;
289+
let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?;
281290
// # TODO
282291
// Support process delete entries.
283292

284293
let mut manifest_files = vec![added_manifest];
285294
manifest_files.extend(existing_manifests);
286-
let manifest_files = manifest_process.process_manifests(manifest_files);
295+
let manifest_files = manifest_process.process_manifests(self, manifest_files);
287296
Ok(manifest_files)
288297
}
289298

290299
// Returns a `Summary` of the current snapshot
291300
fn summary<OP: SnapshotProduceOperation>(
292301
&self,
293-
table: &Table,
294302
snapshot_produce_operation: &OP,
295303
) -> Result<Summary> {
296304
let mut summary_collector = SnapshotSummaryCollector::default();
297-
let table_metadata = table.metadata_ref();
305+
let table_metadata = self.table.metadata_ref();
298306

299307
let partition_summary_limit = if let Some(limit) = table_metadata
300308
.properties()
@@ -339,10 +347,10 @@ impl SnapshotProducer {
339347
)
340348
}
341349

342-
fn generate_manifest_list_file_path(&self, table: &Table, attempt: i64) -> String {
350+
fn generate_manifest_list_file_path(&self, attempt: i64) -> String {
343351
format!(
344352
"{}/{}/snap-{}-{}-{}.{}",
345-
table.metadata().location(),
353+
self.table.metadata().location(),
346354
META_ROOT_PATH,
347355
self.snapshot_id,
348356
attempt,
@@ -354,34 +362,34 @@ impl SnapshotProducer {
354362
/// Finished building the action and return the [`ActionCommit`] to the transaction.
355363
pub(crate) async fn commit<OP: SnapshotProduceOperation, MP: ManifestProcess>(
356364
mut self,
357-
table: &Table,
358365
snapshot_produce_operation: OP,
359366
process: MP,
360367
) -> Result<ActionCommit> {
361368
let new_manifests = self
362-
.manifest_file(table, &snapshot_produce_operation, &process)
369+
.manifest_file(&snapshot_produce_operation, &process)
363370
.await?;
364-
let next_seq_num = table.metadata().next_sequence_number();
371+
let next_seq_num = self.table.metadata().next_sequence_number();
365372

366-
let summary = self
367-
.summary(table, &snapshot_produce_operation)
368-
.map_err(|err| {
369-
Error::new(ErrorKind::Unexpected, "Failed to create snapshot summary.")
370-
.with_source(err)
371-
})?;
373+
let summary = self.summary(&snapshot_produce_operation).map_err(|err| {
374+
Error::new(ErrorKind::Unexpected, "Failed to create snapshot summary.").with_source(err)
375+
})?;
372376

373-
let manifest_list_path = self.generate_manifest_list_file_path(table, 0);
377+
let manifest_list_path = self.generate_manifest_list_file_path(0);
374378

375-
let mut manifest_list_writer = match table.metadata().format_version() {
379+
let mut manifest_list_writer = match self.table.metadata().format_version() {
376380
FormatVersion::V1 => ManifestListWriter::v1(
377-
table.file_io().new_output(manifest_list_path.clone())?,
381+
self.table
382+
.file_io()
383+
.new_output(manifest_list_path.clone())?,
378384
self.snapshot_id,
379-
table.metadata().current_snapshot_id(),
385+
self.table.metadata().current_snapshot_id(),
380386
),
381387
FormatVersion::V2 => ManifestListWriter::v2(
382-
table.file_io().new_output(manifest_list_path.clone())?,
388+
self.table
389+
.file_io()
390+
.new_output(manifest_list_path.clone())?,
383391
self.snapshot_id,
384-
table.metadata().current_snapshot_id(),
392+
self.table.metadata().current_snapshot_id(),
385393
next_seq_num,
386394
),
387395
};
@@ -392,10 +400,10 @@ impl SnapshotProducer {
392400
let new_snapshot = Snapshot::builder()
393401
.with_manifest_list(manifest_list_path)
394402
.with_snapshot_id(self.snapshot_id)
395-
.with_parent_snapshot_id(table.metadata().current_snapshot_id())
403+
.with_parent_snapshot_id(self.table.metadata().current_snapshot_id())
396404
.with_sequence_number(next_seq_num)
397405
.with_summary(summary)
398-
.with_schema_id(table.metadata().current_schema_id())
406+
.with_schema_id(self.table.metadata().current_schema_id())
399407
.with_timestamp_ms(commit_ts)
400408
.build();
401409

@@ -414,11 +422,11 @@ impl SnapshotProducer {
414422

415423
let requirements = vec![
416424
TableRequirement::UuidMatch {
417-
uuid: table.metadata().uuid(),
425+
uuid: self.table.metadata().uuid(),
418426
},
419427
TableRequirement::RefSnapshotIdMatch {
420428
r#ref: MAIN_BRANCH.to_string(),
421-
snapshot_id: table.metadata().current_snapshot_id(),
429+
snapshot_id: self.table.metadata().current_snapshot_id(),
422430
},
423431
];
424432

0 commit comments

Comments
 (0)