Skip to content

Commit c6ac066

Browse files
ZENOTMEZENOTMECopilot
authored andcommitted
feat: add process delete enrty in snapshot produce (#33)
* support spec id in data file * support proccess delete entry * fullfill partition spec id * fix * fix spelling mistake Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: ZENOTME <st810918843@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: xxchan <xxchan22f@gmail.com>
1 parent 1f4d225 commit c6ac066

File tree

11 files changed

+120
-22
lines changed

11 files changed

+120
-22
lines changed

crates/iceberg/src/arrow/record_batch_partition_spliter.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
2727

2828
use super::record_batch_projector::RecordBatchProjector;
2929
use crate::arrow::{arrow_struct_to_literal, type_to_arrow_type};
30-
use crate::spec::{Literal, PartitionSpecRef, SchemaRef, Struct, StructType, Type};
30+
use crate::spec::{Literal, PartitionSpec, PartitionSpecRef, SchemaRef, Struct, StructType, Type};
3131
use crate::transform::{create_transform_function, BoxedTransformFunction};
3232
use crate::{Error, ErrorKind, Result};
3333

@@ -186,6 +186,10 @@ impl RecordBatchPartitionSpliter {
186186
})
187187
}
188188

189+
pub(crate) fn partition_spec(&self) -> &PartitionSpec {
190+
self.partition_spec.as_ref()
191+
}
192+
189193
/// Split the record batch into multiple record batches by the partition spec.
190194
pub(crate) fn split(&self, batch: &RecordBatch) -> Result<Vec<(OwnedRow, RecordBatch)>> {
191195
// get array using partition spec

crates/iceberg/src/spec/manifest/data_file.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,10 @@ impl DataFile {
239239
pub(crate) fn rewrite_partition(&mut self, partition: Struct) {
240240
self.partition = partition;
241241
}
242+
243+
pub(crate) fn rewrite_partition_id(&mut self, partition_spec_id: i32) {
244+
self.partition_spec_id = partition_spec_id;
245+
}
242246
}
243247

244248
/// Convert data files to avro bytes and write to writer.

crates/iceberg/src/transaction/append.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,7 @@ mod tests {
310310

311311
// check add data file with incompatible partition value
312312
let data_file = DataFileBuilder::default()
313+
.partition_spec_id(0)
313314
.content(DataContentType::Data)
314315
.file_path("test/3.parquet".to_string())
315316
.file_format(DataFileFormat::Parquet)
@@ -322,6 +323,7 @@ mod tests {
322323
assert!(action.add_data_files(vec![data_file.clone()]).is_err());
323324

324325
let data_file = DataFileBuilder::default()
326+
.partition_spec_id(0)
325327
.content(DataContentType::Data)
326328
.file_path("test/3.parquet".to_string())
327329
.file_format(DataFileFormat::Parquet)

crates/iceberg/src/transaction/snapshot.rs

Lines changed: 87 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ impl<'a> SnapshotProduceAction<'a> {
168168
fn new_manifest_writer(
169169
&mut self,
170170
content_type: &ManifestContentType,
171+
partition_spec_id: i32,
171172
) -> Result<ManifestWriter> {
172173
let new_manifest_path = format!(
173174
"{}/{}/{}-m{}.{}",
@@ -182,17 +183,26 @@ impl<'a> SnapshotProduceAction<'a> {
182183
.current_table
183184
.file_io()
184185
.new_output(new_manifest_path)?;
186+
let partition_spec = self
187+
.tx
188+
.current_table
189+
.metadata()
190+
.partition_spec_by_id(partition_spec_id)
191+
.ok_or_else(|| {
192+
Error::new(
193+
ErrorKind::DataInvalid,
194+
"Invalid partition spec id for new manifest writer",
195+
)
196+
.with_context("partition spec id", partition_spec_id.to_string())
197+
})?
198+
.as_ref()
199+
.clone();
185200
let builder = ManifestWriterBuilder::new(
186201
output,
187202
Some(self.snapshot_id),
188203
self.key_metadata.clone(),
189204
self.tx.current_table.metadata().current_schema().clone(),
190-
self.tx
191-
.current_table
192-
.metadata()
193-
.default_partition_spec()
194-
.as_ref()
195-
.clone(),
205+
partition_spec,
196206
);
197207
if self.tx.current_table.metadata().format_version() == FormatVersion::V1 {
198208
Ok(builder.build_v1())
@@ -244,29 +254,95 @@ impl<'a> SnapshotProduceAction<'a> {
244254
builder.build()
245255
}
246256
});
247-
let mut writer = self.new_manifest_writer(&content_type)?;
257+
let mut writer = self.new_manifest_writer(
258+
&content_type,
259+
self.tx.current_table.metadata().default_partition_spec_id(),
260+
)?;
248261
for entry in manifest_entries {
249262
writer.add_entry(entry)?;
250263
}
251264
writer.write_manifest_file().await
252265
}
253266

267+
async fn write_delete_manifest(
268+
&mut self,
269+
deleted_entries: Vec<ManifestEntry>,
270+
) -> Result<Vec<ManifestFile>> {
271+
if deleted_entries.is_empty() {
272+
return Ok(vec![]);
273+
}
274+
275+
// Group deleted entries by spec_id
276+
let mut partition_groups = HashMap::new();
277+
for entry in deleted_entries {
278+
partition_groups
279+
.entry(entry.data_file().partition_spec_id)
280+
.or_insert_with(Vec::new)
281+
.push(entry);
282+
}
283+
284+
// Write a delete manifest per spec_id group
285+
let mut deleted_manifests = Vec::new();
286+
for (spec_id, entries) in partition_groups {
287+
let mut data_file_writer: Option<ManifestWriter> = None;
288+
let mut delete_file_writer: Option<ManifestWriter> = None;
289+
for entry in entries {
290+
match entry.content_type() {
291+
DataContentType::Data => {
292+
if data_file_writer.is_none() {
293+
data_file_writer = Some(
294+
self.new_manifest_writer(&ManifestContentType::Data, spec_id)?,
295+
);
296+
}
297+
data_file_writer.as_mut().unwrap().add_delete_entry(entry)?;
298+
}
299+
DataContentType::EqualityDeletes | DataContentType::PositionDeletes => {
300+
if delete_file_writer.is_none() {
301+
delete_file_writer = Some(
302+
self.new_manifest_writer(&ManifestContentType::Deletes, spec_id)?,
303+
);
304+
}
305+
delete_file_writer
306+
.as_mut()
307+
.unwrap()
308+
.add_delete_entry(entry)?;
309+
}
310+
}
311+
}
312+
if let Some(writer) = data_file_writer {
313+
deleted_manifests.push(writer.write_manifest_file().await?);
314+
}
315+
if let Some(writer) = delete_file_writer {
316+
deleted_manifests.push(writer.write_manifest_file().await?);
317+
}
318+
}
319+
320+
Ok(deleted_manifests)
321+
}
322+
254323
async fn manifest_file<OP: SnapshotProduceOperation, MP: ManifestProcess>(
255324
&mut self,
256325
snapshot_produce_operation: &OP,
257326
manifest_process: &MP,
258327
) -> Result<Vec<ManifestFile>> {
259328
let mut manifest_files = vec![];
260329
let data_files = std::mem::take(&mut self.added_data_files);
261-
let delete_files = std::mem::take(&mut self.added_delete_files);
330+
let added_delete_files = std::mem::take(&mut self.added_delete_files);
262331
if !data_files.is_empty() {
263332
let added_manifest = self.write_added_manifest(data_files).await?;
264333
manifest_files.push(added_manifest);
265334
}
266-
if !delete_files.is_empty() {
267-
let added_delete_manifest = self.write_added_manifest(delete_files).await?;
335+
336+
if !added_delete_files.is_empty() {
337+
let added_delete_manifest = self.write_added_manifest(added_delete_files).await?;
268338
manifest_files.push(added_delete_manifest);
269339
}
340+
341+
let delete_manifests = self
342+
.write_delete_manifest(snapshot_produce_operation.delete_entries(self).await?)
343+
.await?;
344+
manifest_files.extend(delete_manifests);
345+
270346
let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?;
271347

272348
manifest_files.extend(existing_manifests);
@@ -505,7 +581,7 @@ impl MergeManifestManager {
505581
Box<dyn Future<Output = Result<Vec<ManifestFile>>> + Send>,
506582
>)
507583
} else {
508-
let writer = snapshot_produce.new_manifest_writer(&self.content)?;
584+
let writer = snapshot_produce.new_manifest_writer(&self.content,snapshot_produce.tx.current_table.metadata().default_partition_spec_id())?;
509585
let snapshot_id = snapshot_produce.snapshot_id;
510586
let file_io = snapshot_produce.tx.current_table.file_io().clone();
511587
Ok((Box::pin(async move {

crates/iceberg/src/writer/base_writer/sort_position_delete_writer.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,22 @@ pub struct SortPositionDeleteWriterBuilder<B: FileWriterBuilder> {
3535
inner: B,
3636
cache_num: usize,
3737
partition_value: Option<Struct>,
38+
partition_spec_id: Option<i32>,
3839
}
3940

4041
impl<B: FileWriterBuilder> SortPositionDeleteWriterBuilder<B> {
4142
/// Create a new `SortPositionDeleteWriterBuilder` using a `FileWriterBuilder`.
42-
pub fn new(inner: B, cache_num: usize, partition_value: Option<Struct>) -> Self {
43+
pub fn new(
44+
inner: B,
45+
cache_num: usize,
46+
partition_value: Option<Struct>,
47+
partition_spec_id: Option<i32>,
48+
) -> Self {
4349
Self {
4450
inner,
4551
cache_num,
4652
partition_value,
53+
partition_spec_id,
4754
}
4855
}
4956
}
@@ -86,6 +93,7 @@ impl<B: FileWriterBuilder> IcebergWriterBuilder<PositionDeleteInput, Vec<DataFil
8693
cache: BTreeMap::new(),
8794
data_files: Vec::new(),
8895
partition_value: self.partition_value.unwrap_or(Struct::empty()),
96+
partition_spec_id: self.partition_spec_id.unwrap_or(0),
8997
})
9098
}
9199
}
@@ -106,6 +114,7 @@ pub struct SortPositionDeleteWriter<B: FileWriterBuilder> {
106114
cache: BTreeMap<String, Vec<i64>>,
107115
data_files: Vec<DataFile>,
108116
partition_value: Struct,
117+
partition_spec_id: i32,
109118
}
110119

111120
impl<B: FileWriterBuilder> SortPositionDeleteWriter<B> {
@@ -140,6 +149,7 @@ impl<B: FileWriterBuilder> SortPositionDeleteWriter<B> {
140149
.extend(writer.close().await?.into_iter().map(|mut res| {
141150
res.content(crate::spec::DataContentType::PositionDeletes);
142151
res.partition(self.partition_value.clone());
152+
res.partition_spec_id(self.partition_spec_id);
143153
res.build().expect("Guaranteed to be valid")
144154
}));
145155
Ok(())
@@ -204,7 +214,7 @@ mod test {
204214
location_gen,
205215
file_name_gen,
206216
);
207-
let mut position_delete_writer = SortPositionDeleteWriterBuilder::new(pw, 10, None)
217+
let mut position_delete_writer = SortPositionDeleteWriterBuilder::new(pw, 10, None, None)
208218
.build()
209219
.await?;
210220

crates/iceberg/src/writer/file_writer/parquet_writer.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,7 @@ impl ParquetWriter {
438438
.file_path(file_path)
439439
.file_format(DataFileFormat::Parquet)
440440
.partition(Struct::empty())
441+
.partition_spec_id(0)
441442
.record_count(metadata.file_metadata().num_rows() as u64)
442443
.file_size_in_bytes(written_size as u64)
443444
.column_sizes(column_sizes)
@@ -1510,7 +1511,7 @@ mod tests {
15101511
// .next()
15111512
// .unwrap()
15121513
// .content(crate::spec::DataContentType::Data)
1513-
// .partition(Struct::empty())
1514+
// .partition(Struct::empty()).partition_spec_id(0)
15141515
// .build()
15151516
// .unwrap();
15161517
// assert_eq!(

crates/iceberg/src/writer/function_writer/equality_delta_writer.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ mod test {
318318
location_gen.clone(),
319319
file_name_gen.clone(),
320320
);
321-
DataFileWriterBuilder::new(pw.clone(), None)
321+
DataFileWriterBuilder::new(pw.clone(), None, None)
322322
};
323323
let position_delete_writer_builder = {
324324
let pw = ParquetWriterBuilder::new(
@@ -328,10 +328,10 @@ mod test {
328328
location_gen.clone(),
329329
file_name_gen.clone(),
330330
);
331-
SortPositionDeleteWriterBuilder::new(pw.clone(), 100, None)
331+
SortPositionDeleteWriterBuilder::new(pw.clone(), 100, None, None)
332332
};
333333
let equality_delete_writer_builder = {
334-
let config = EqualityDeleteWriterConfig::new(vec![1, 2], schema, None)?;
334+
let config = EqualityDeleteWriterConfig::new(vec![1, 2], schema, None, None)?;
335335
let pw = ParquetWriterBuilder::new(
336336
WriterProperties::builder().build(),
337337
arrow_schema_to_schema(config.projected_arrow_schema_ref())

crates/iceberg/src/writer/function_writer/fanout_partition_writer.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ impl<B: IcebergWriterBuilder> IcebergWriter for FanoutPartitionWriter<B> {
134134
let mut data_files = writer.close().await?;
135135
for data_file in data_files.iter_mut() {
136136
data_file.rewrite_partition(partition_value.clone());
137+
data_file.rewrite_partition_id(self.partition_splitter.partition_spec().spec_id());
137138
}
138139
result.append(&mut data_files);
139140
}
@@ -211,7 +212,7 @@ mod test {
211212
location_gen,
212213
file_name_gen,
213214
);
214-
let data_file_writer_builder = DataFileWriterBuilder::new(pw, None);
215+
let data_file_writer_builder = DataFileWriterBuilder::new(pw, None, 0);
215216
let mut fanout_partition_writer = FanoutPartitionWriterBuilder::new(
216217
data_file_writer_builder,
217218
Arc::new(partition_spec),

crates/iceberg/src/writer/function_writer/precompute_partition_writer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ mod test {
207207
location_gen,
208208
file_name_gen,
209209
);
210-
let data_file_writer_builder = DataFileWriterBuilder::new(pw, None);
210+
let data_file_writer_builder = DataFileWriterBuilder::new(pw, None, 0);
211211
let mut precompute_partition_writer = PrecomputePartitionWriterBuilder::new(
212212
data_file_writer_builder,
213213
Arc::new(partition_spec),

crates/integration_tests/tests/shared_tests/merge_append_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ async fn write_new_data_file(table: &Table) -> Vec<DataFile> {
6363
location_generator.clone(),
6464
file_name_generator.clone(),
6565
);
66-
let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None);
66+
let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 0);
6767
let mut data_file_writer = data_file_writer_builder.build().await.unwrap();
6868
let col1 = StringArray::from(vec![Some("foo"); 100]);
6969
let col2 = Int32Array::from(vec![Some(1); 100]);

0 commit comments

Comments
 (0)