Skip to content

Commit c9da916

Browse files
authored
fix: cherry-pick #27 (#41)
1 parent e6bb6b8 commit c9da916

File tree

8 files changed

+90
-30
lines changed

8 files changed

+90
-30
lines changed

crates/iceberg/src/arrow/delete_file_manager.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ impl CachingDeleteFileManager {
5757
}
5858
}
5959

60+
#[allow(dead_code)]
6061
pub(crate) async fn load_deletes(
6162
&self,
6263
delete_file_entries: Vec<FileScanTaskDeleteFile>,

crates/iceberg/src/arrow/reader.rs

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FI
3939
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData};
4040
use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
4141

42+
use super::record_batch_transformer::RecordBatchTransformer;
4243
use crate::arrow::delete_file_manager::CachingDeleteFileManager;
43-
use crate::arrow::record_batch_transformer::RecordBatchTransformer;
4444
use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
4545
use crate::delete_vector::DeleteVector;
4646
use crate::error::Result;
@@ -50,7 +50,7 @@ use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator
5050
use crate::expr::{BoundPredicate, BoundReference};
5151
use crate::io::{FileIO, FileMetadata, FileRead};
5252
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
53-
use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type};
53+
use crate::spec::{DataContentType, Datum, NestedField, PrimitiveType, Schema, Type};
5454
use crate::utils::available_parallelism;
5555
use crate::{Error, ErrorKind};
5656

@@ -176,14 +176,12 @@ impl ArrowReader {
176176
(row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
177177

178178
// concurrently retrieve delete files and create RecordBatchStreamBuilder
179-
let (_, mut record_batch_stream_builder) = try_join!(
180-
delete_file_manager.load_deletes(task.deletes.clone()),
181-
Self::create_parquet_record_batch_stream_builder(
182-
&task.data_file_path,
183-
file_io.clone(),
184-
should_load_page_index,
185-
)
186-
)?;
179+
let mut record_batch_stream_builder = Self::create_parquet_record_batch_stream_builder(
180+
&task.data_file_path,
181+
file_io.clone(),
182+
should_load_page_index,
183+
)
184+
.await?;
187185

188186
// Create a projection mask for the batch stream to select which columns in the
189187
// Parquet file that we want in the response
@@ -305,13 +303,16 @@ impl ArrowReader {
305303

306304
// Build the batch stream and send all the RecordBatches that it generates
307305
// to the requester.
308-
let record_batch_stream =
309-
record_batch_stream_builder
310-
.build()?
311-
.map(move |batch| match batch {
306+
let record_batch_stream = record_batch_stream_builder.build()?.map(move |batch| {
307+
if matches!(task.data_file_content, DataContentType::PositionDeletes) {
308+
Ok(batch?)
309+
} else {
310+
match batch {
312311
Ok(batch) => record_batch_transformer.process_record_batch(batch),
313312
Err(err) => Err(err.into()),
314-
});
313+
}
314+
}
315+
});
315316

316317
Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
317318
}
@@ -1520,6 +1521,8 @@ message schema {
15201521
project_field_ids: vec![1],
15211522
predicate: Some(predicate.bind(schema, true).unwrap()),
15221523
deletes: vec![],
1524+
sequence_number: 0,
1525+
equality_ids: vec![],
15231526
})]
15241527
.into_iter(),
15251528
)) as FileScanTaskStream;

crates/iceberg/src/delete_file_index.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use futures::channel::mpsc::{channel, Sender};
2626
use futures::StreamExt;
2727

2828
use crate::runtime::spawn;
29-
use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile};
29+
use crate::scan::{DeleteFileContext, FileScanTask};
3030
use crate::spec::{DataContentType, DataFile, Struct};
3131
use crate::{Error, ErrorKind, Result};
3232

@@ -118,10 +118,11 @@ impl PopulatedDeleteFileIndex {
118118
// The spec states that "Equality delete files stored with an unpartitioned spec are applied as global deletes".
119119
if partition.fields().is_empty() {
120120
// TODO: confirm we're good to skip here if we encounter a pos del
121-
if arc_ctx.manifest_entry.content_type() != DataContentType::PositionDeletes {
122-
global_deletes.push(arc_ctx);
123-
return;
124-
}
121+
// FIXME(Dylan): allow putting position delete to global deletes.
122+
// if arc_ctx.manifest_entry.content_type() != DataContentType::PositionDeletes {
123+
global_deletes.push(arc_ctx);
124+
return;
125+
// }
125126
}
126127

127128
let destination_map = match arc_ctx.manifest_entry.content_type() {
@@ -150,7 +151,7 @@ impl PopulatedDeleteFileIndex {
150151
&self,
151152
data_file: &DataFile,
152153
seq_num: Option<i64>,
153-
) -> Vec<FileScanTaskDeleteFile> {
154+
) -> Vec<FileScanTask> {
154155
let mut results = vec![];
155156

156157
self.global_deletes
@@ -203,7 +204,7 @@ pub(crate) struct DeletesForDataFile<'a> {
203204
}
204205

205206
impl Future for DeletesForDataFile<'_> {
206-
type Output = Result<Vec<FileScanTaskDeleteFile>>;
207+
type Output = Result<Vec<FileScanTask>>;
207208

208209
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
209210
match self.state.try_read() {

crates/iceberg/src/scan/context.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ impl ManifestEntryContext {
140140
.map(|x| x.as_ref().snapshot_bound_predicate.clone()),
141141

142142
deletes,
143+
sequence_number: self.manifest_entry.sequence_number().unwrap_or(0),
144+
equality_ids: self.manifest_entry.data_file().equality_ids().to_vec(),
143145
})
144146
}
145147
}

crates/iceberg/src/scan/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,8 @@ impl TableScan {
605605
.send(DeleteFileContext {
606606
manifest_entry: manifest_entry_context.manifest_entry.clone(),
607607
partition_spec_id: manifest_entry_context.partition_spec_id,
608+
snapshot_schema: manifest_entry_context.snapshot_schema.clone(),
609+
field_ids: manifest_entry_context.field_ids.clone(),
608610
})
609611
.await?;
610612

@@ -1791,6 +1793,8 @@ pub mod tests {
17911793
record_count: Some(100),
17921794
data_file_format: DataFileFormat::Parquet,
17931795
deletes: vec![],
1796+
sequence_number: 0,
1797+
equality_ids: vec![],
17941798
};
17951799
test_fn(task);
17961800

@@ -1806,6 +1810,8 @@ pub mod tests {
18061810
record_count: None,
18071811
data_file_format: DataFileFormat::Avro,
18081812
deletes: vec![],
1813+
sequence_number: 0,
1814+
equality_ids: vec![],
18091815
};
18101816
test_fn(task);
18111817
}

crates/iceberg/src/scan/task.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::sync::Arc;
19+
1820
use futures::stream::BoxStream;
1921
use serde::{Deserialize, Serialize};
2022

@@ -56,7 +58,11 @@ pub struct FileScanTask {
5658
pub predicate: Option<BoundPredicate>,
5759

5860
/// The list of delete files that may need to be applied to this data file
59-
pub deletes: Vec<FileScanTaskDeleteFile>,
61+
pub deletes: Vec<FileScanTask>,
62+
/// sequence number
63+
pub sequence_number: i64,
64+
/// equality ids
65+
pub equality_ids: Vec<i32>,
6066
}
6167

6268
impl FileScanTask {
@@ -90,6 +96,8 @@ impl FileScanTask {
9096
pub(crate) struct DeleteFileContext {
9197
pub(crate) manifest_entry: ManifestEntryRef,
9298
pub(crate) partition_spec_id: i32,
99+
pub(crate) snapshot_schema: SchemaRef,
100+
pub(crate) field_ids: Arc<Vec<i32>>,
93101
}
94102

95103
impl From<&DeleteFileContext> for FileScanTaskDeleteFile {
@@ -102,6 +110,27 @@ impl From<&DeleteFileContext> for FileScanTaskDeleteFile {
102110
}
103111
}
104112

113+
impl From<&DeleteFileContext> for FileScanTask {
114+
fn from(ctx: &DeleteFileContext) -> Self {
115+
FileScanTask {
116+
start: 0,
117+
length: ctx.manifest_entry.file_size_in_bytes(),
118+
record_count: Some(ctx.manifest_entry.record_count()),
119+
120+
data_file_path: ctx.manifest_entry.file_path().to_string(),
121+
data_file_content: ctx.manifest_entry.content_type(),
122+
data_file_format: ctx.manifest_entry.file_format(),
123+
124+
schema: ctx.snapshot_schema.clone(),
125+
project_field_ids: ctx.field_ids.to_vec(),
126+
predicate: None,
127+
deletes: vec![],
128+
sequence_number: ctx.manifest_entry.sequence_number().unwrap_or(0),
129+
equality_ids: ctx.manifest_entry.data_file().equality_ids().to_vec(),
130+
}
131+
}
132+
}
133+
105134
/// A task to scan part of file.
106135
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
107136
pub struct FileScanTaskDeleteFile {

crates/iceberg/src/writer/function_writer/equality_delta_writer.rs

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -255,12 +255,14 @@ where
255255

256256
#[cfg(test)]
257257
mod test {
258+
use std::collections::HashMap;
258259
use std::sync::Arc;
259260

260261
use arrow_array::{Int32Array, Int64Array, RecordBatch, StringArray};
261262
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
262263
use arrow_select::concat::concat_batches;
263264
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
265+
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
264266
use parquet::file::properties::WriterProperties;
265267
use tempfile::TempDir;
266268

@@ -318,7 +320,7 @@ mod test {
318320
location_gen.clone(),
319321
file_name_gen.clone(),
320322
);
321-
DataFileWriterBuilder::new(pw.clone(), None, None)
323+
DataFileWriterBuilder::new(pw.clone(), None, 0)
322324
};
323325
let position_delete_writer_builder = {
324326
let pw = ParquetWriterBuilder::new(
@@ -331,7 +333,7 @@ mod test {
331333
SortPositionDeleteWriterBuilder::new(pw.clone(), 100, None, None)
332334
};
333335
let equality_delete_writer_builder = {
334-
let config = EqualityDeleteWriterConfig::new(vec![1, 2], schema, None, None)?;
336+
let config = EqualityDeleteWriterConfig::new(vec![1, 2], schema, None, 0)?;
335337
let pw = ParquetWriterBuilder::new(
336338
WriterProperties::builder().build(),
337339
arrow_schema_to_schema(config.projected_arrow_schema_ref())
@@ -355,9 +357,18 @@ mod test {
355357

356358
// write data
357359
let schema = Arc::new(ArrowSchema::new(vec![
358-
Field::new("id", DataType::Int64, true),
359-
Field::new("data", DataType::Utf8, true),
360-
Field::new("op", DataType::Int32, false),
360+
Field::new("id", DataType::Int64, true).with_metadata(HashMap::from([(
361+
PARQUET_FIELD_ID_META_KEY.to_string(),
362+
1.to_string(),
363+
)])),
364+
Field::new("data", DataType::Utf8, true).with_metadata(HashMap::from([(
365+
PARQUET_FIELD_ID_META_KEY.to_string(),
366+
2.to_string(),
367+
)])),
368+
Field::new("op", DataType::Int32, false).with_metadata(HashMap::from([(
369+
PARQUET_FIELD_ID_META_KEY.to_string(),
370+
3.to_string(),
371+
)])),
361372
]));
362373
{
363374
let id_array = Int64Array::from(vec![1, 2, 1, 3, 2, 3, 1]);
@@ -388,8 +399,14 @@ mod test {
388399
assert_eq!(data_files.len(), 3);
389400
// data file
390401
let data_schema = Arc::new(ArrowSchema::new(vec![
391-
Field::new("id", DataType::Int64, true),
392-
Field::new("data", DataType::Utf8, true),
402+
Field::new("id", DataType::Int64, true).with_metadata(HashMap::from([(
403+
PARQUET_FIELD_ID_META_KEY.to_string(),
404+
1.to_string(),
405+
)])),
406+
Field::new("data", DataType::Utf8, true).with_metadata(HashMap::from([(
407+
PARQUET_FIELD_ID_META_KEY.to_string(),
408+
2.to_string(),
409+
)])),
393410
]));
394411
let data_file = data_files
395412
.iter()

crates/iceberg/src/writer/function_writer/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,6 @@
1717

1818
//! This module contains the functional writer.
1919
20+
pub mod equality_delta_writer;
2021
pub mod fanout_partition_writer;
2122
pub mod precompute_partition_writer;

0 commit comments

Comments
 (0)