Skip to content

Commit 19660f2

Browse files
ZENOTMExxchan
authored andcommitted
support append delete file (#19)
Co-authored-by: ZENOTME <st810918843@gmail.com> Signed-off-by: xxchan <xxchan22f@gmail.com>
1 parent 16bb978 commit 19660f2

File tree

1 file changed

+51
-18
lines changed

1 file changed

+51
-18
lines changed

crates/iceberg/src/transaction/snapshot.rs

Lines changed: 51 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ use uuid::Uuid;
2424
use crate::error::Result;
2525
use crate::io::OutputFile;
2626
use crate::spec::{
27-
DataFile, DataFileFormat, FormatVersion, ManifestEntry, ManifestFile, ManifestListWriter,
28-
ManifestWriterBuilder, Operation, Snapshot, SnapshotReference, SnapshotRetention, Struct,
29-
StructType, Summary, MAIN_BRANCH,
27+
DataContentType, DataFile, DataFileFormat, FormatVersion, ManifestContentType, ManifestEntry,
28+
ManifestFile, ManifestListWriter, ManifestWriterBuilder, Operation, Snapshot,
29+
SnapshotReference, SnapshotRetention, Struct, StructType, Summary, MAIN_BRANCH,
3030
};
3131
use crate::transaction::Transaction;
3232
use crate::{Error, ErrorKind, TableRequirement, TableUpdate};
@@ -65,6 +65,7 @@ pub(crate) struct SnapshotProduceAction<'a> {
6565
commit_uuid: Uuid,
6666
snapshot_properties: HashMap<String, String>,
6767
pub added_data_files: Vec<DataFile>,
68+
added_delete_files: Vec<DataFile>,
6869
// A counter used to generate unique manifest file names.
6970
// It starts from 0 and increments for each new manifest file.
7071
// Note: This counter is limited to the range of (0..u64::MAX).
@@ -85,6 +86,7 @@ impl<'a> SnapshotProduceAction<'a> {
8586
commit_uuid,
8687
snapshot_properties,
8788
added_data_files: vec![],
89+
added_delete_files: vec![],
8890
manifest_counter: (0..),
8991
key_metadata,
9092
})
@@ -129,13 +131,7 @@ impl<'a> SnapshotProduceAction<'a> {
129131
data_files: impl IntoIterator<Item = DataFile>,
130132
) -> Result<&mut Self> {
131133
let data_files: Vec<DataFile> = data_files.into_iter().collect();
132-
for data_file in &data_files {
133-
if data_file.content_type() != crate::spec::DataContentType::Data {
134-
return Err(Error::new(
135-
ErrorKind::DataInvalid,
136-
"Only data content type is allowed for fast append",
137-
));
138-
}
134+
for data_file in data_files {
139135
// Check if the data file partition spec id matches the table default partition spec id.
140136
if self.tx.table.metadata().default_partition_spec_id() != data_file.partition_spec_id {
141137
return Err(Error::new(
@@ -147,8 +143,12 @@ impl<'a> SnapshotProduceAction<'a> {
147143
data_file.partition(),
148144
self.tx.table.metadata().default_partition_type(),
149145
)?;
146+
if data_file.content_type() == DataContentType::Data {
147+
self.added_data_files.push(data_file);
148+
} else {
149+
self.added_delete_files.push(data_file);
150+
}
150151
}
151-
self.added_data_files.extend(data_files);
152152
Ok(self)
153153
}
154154

@@ -165,9 +165,32 @@ impl<'a> SnapshotProduceAction<'a> {
165165
}
166166

167167
// Write manifest file for added data files and return the ManifestFile for ManifestList.
168-
async fn write_added_manifest(&mut self) -> Result<ManifestFile> {
169-
let added_data_files = std::mem::take(&mut self.added_data_files);
168+
async fn write_added_manifest(
169+
&mut self,
170+
added_data_files: Vec<DataFile>,
171+
) -> Result<ManifestFile> {
170172
let snapshot_id = self.snapshot_id;
173+
let content_type = {
174+
let mut data_num = 0;
175+
let mut delete_num = 0;
176+
for f in &added_data_files {
177+
match f.content_type() {
178+
DataContentType::Data => data_num += 1,
179+
DataContentType::PositionDeletes => delete_num += 1,
180+
DataContentType::EqualityDeletes => delete_num += 1,
181+
}
182+
}
183+
if data_num == added_data_files.len() {
184+
ManifestContentType::Data
185+
} else if delete_num == added_data_files.len() {
186+
ManifestContentType::Deletes
187+
} else {
188+
return Err(Error::new(
189+
ErrorKind::DataInvalid,
190+
"added DataFile for a ManifestFile should be same type (Data or Delete)",
191+
));
192+
}
193+
};
171194
let manifest_entries = added_data_files.into_iter().map(|data_file| {
172195
let builder = ManifestEntry::builder()
173196
.status(crate::spec::ManifestStatus::Added)
@@ -196,7 +219,10 @@ impl<'a> SnapshotProduceAction<'a> {
196219
if self.tx.table.metadata().format_version() == FormatVersion::V1 {
197220
builder.build_v1()
198221
} else {
199-
builder.build_v2_data()
222+
match content_type {
223+
ManifestContentType::Data => builder.build_v2_data(),
224+
ManifestContentType::Deletes => builder.build_v2_deletes(),
225+
}
200226
}
201227
};
202228
for entry in manifest_entries {
@@ -210,12 +236,19 @@ impl<'a> SnapshotProduceAction<'a> {
210236
snapshot_produce_operation: &OP,
211237
manifest_process: &MP,
212238
) -> Result<Vec<ManifestFile>> {
213-
let added_manifest = self.write_added_manifest().await?;
239+
let mut manifest_files = vec![];
240+
let data_files = std::mem::take(&mut self.added_data_files);
241+
let delete_files = std::mem::take(&mut self.added_delete_files);
242+
if !data_files.is_empty() {
243+
let added_manifest = self.write_added_manifest(data_files).await?;
244+
manifest_files.push(added_manifest);
245+
}
246+
if !delete_files.is_empty() {
247+
let added_delete_manifest = self.write_added_manifest(delete_files).await?;
248+
manifest_files.push(added_delete_manifest);
249+
}
214250
let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?;
215-
// # TODO
216-
// Support process delete entries.
217251

218-
let mut manifest_files = vec![added_manifest];
219252
manifest_files.extend(existing_manifests);
220253
let manifest_files = manifest_process.process_manifeset(manifest_files);
221254
Ok(manifest_files)

0 commit comments

Comments
 (0)