Skip to content

Commit 41e727d

Browse files
ZENOTMExxchan
authored andcommitted
feat(writer): add sort position delete writer (#17)
Co-authored-by: ZENOTME <st810918843@gmail.com>
1 parent 849b4fc commit 41e727d

File tree

2 files changed

+281
-0
lines changed

2 files changed

+281
-0
lines changed

crates/iceberg/src/writer/base_writer/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,4 @@
1919
2020
pub mod data_file_writer;
2121
pub mod equality_delete_writer;
22+
pub mod sort_position_delete_writer;
Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Sort position delete file writer.
19+
use std::collections::BTreeMap;
20+
use std::sync::Arc;
21+
22+
use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray};
23+
use arrow_schema::SchemaRef as ArrowSchemaRef;
24+
use once_cell::sync::Lazy;
25+
26+
use crate::arrow::schema_to_arrow_schema;
27+
use crate::spec::{DataFile, NestedField, PrimitiveType, Schema, SchemaRef, Struct, Type};
28+
use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
29+
use crate::writer::{IcebergWriter, IcebergWriterBuilder};
30+
use crate::Result;
31+
32+
/// Builder for `MemoryPositionDeleteWriter`.
33+
#[derive(Clone)]
34+
pub struct SortPositionDeleteWriterBuilder<B: FileWriterBuilder> {
35+
inner: B,
36+
cache_num: usize,
37+
partition_value: Option<Struct>,
38+
}
39+
40+
impl<B: FileWriterBuilder> SortPositionDeleteWriterBuilder<B> {
41+
/// Create a new `SortPositionDeleteWriterBuilder` using a `FileWriterBuilder`.
42+
pub fn new(inner: B, cache_num: usize, partition_value: Option<Struct>) -> Self {
43+
Self {
44+
inner,
45+
cache_num,
46+
partition_value,
47+
}
48+
}
49+
}
50+
51+
/// Schema for position delete file.
52+
pub static POSITION_DELETE_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
53+
Arc::new(
54+
Schema::builder()
55+
.with_fields(vec![
56+
Arc::new(NestedField::required(
57+
2147483546,
58+
"file_path",
59+
Type::Primitive(PrimitiveType::String),
60+
)),
61+
Arc::new(NestedField::required(
62+
2147483545,
63+
"pos",
64+
Type::Primitive(PrimitiveType::Long),
65+
)),
66+
])
67+
.build()
68+
.unwrap(),
69+
)
70+
});
71+
72+
/// Arrow schema for position delete file.
73+
pub static POSITION_DELETE_ARROW_SCHEMA: Lazy<ArrowSchemaRef> =
74+
Lazy::new(|| Arc::new(schema_to_arrow_schema(&POSITION_DELETE_SCHEMA).unwrap()));
75+
76+
#[async_trait::async_trait]
77+
impl<B: FileWriterBuilder> IcebergWriterBuilder<PositionDeleteInput, Vec<DataFile>>
78+
for SortPositionDeleteWriterBuilder<B>
79+
{
80+
type R = SortPositionDeleteWriter<B>;
81+
82+
async fn build(self) -> Result<Self::R> {
83+
Ok(SortPositionDeleteWriter {
84+
inner_writer_builder: self.inner.clone(),
85+
cache_num: self.cache_num,
86+
cache: BTreeMap::new(),
87+
data_files: Vec::new(),
88+
partition_value: self.partition_value.unwrap_or(Struct::empty()),
89+
})
90+
}
91+
}
92+
93+
/// Position delete input.
94+
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
95+
pub struct PositionDeleteInput {
96+
/// The path of the file.
97+
pub path: String,
98+
/// The offset of the position delete.
99+
pub offset: i64,
100+
}
101+
102+
/// The memory position delete writer.
103+
pub struct SortPositionDeleteWriter<B: FileWriterBuilder> {
104+
inner_writer_builder: B,
105+
cache_num: usize,
106+
cache: BTreeMap<String, Vec<i64>>,
107+
data_files: Vec<DataFile>,
108+
partition_value: Struct,
109+
}
110+
111+
impl<B: FileWriterBuilder> SortPositionDeleteWriter<B> {
112+
/// Get the current number of cache rows.
113+
pub fn current_cache_number(&self) -> usize {
114+
self.cache.len()
115+
}
116+
}
117+
118+
impl<B: FileWriterBuilder> SortPositionDeleteWriter<B> {
119+
async fn write_cache_out(&mut self) -> Result<()> {
120+
let mut keys = Vec::new();
121+
let mut values = Vec::new();
122+
let mut cache = std::mem::take(&mut self.cache);
123+
for (key, offsets) in cache.iter_mut() {
124+
offsets.sort();
125+
let key_ref = key.as_str();
126+
for offset in offsets {
127+
keys.push(key_ref);
128+
values.push(*offset);
129+
}
130+
}
131+
let key_array = Arc::new(StringArray::from(keys)) as ArrayRef;
132+
let value_array = Arc::new(Int64Array::from(values)) as ArrayRef;
133+
let record_batch = RecordBatch::try_new(POSITION_DELETE_ARROW_SCHEMA.clone(), vec![
134+
key_array,
135+
value_array,
136+
])?;
137+
let mut writer = self.inner_writer_builder.clone().build().await?;
138+
writer.write(&record_batch).await?;
139+
self.data_files
140+
.extend(writer.close().await?.into_iter().map(|mut res| {
141+
res.content(crate::spec::DataContentType::PositionDeletes);
142+
res.partition(self.partition_value.clone());
143+
res.build().expect("Guaranteed to be valid")
144+
}));
145+
Ok(())
146+
}
147+
}
148+
149+
/// Implement `IcebergWriter` for `PositionDeleteWriter`.
150+
#[async_trait::async_trait]
151+
impl<B: FileWriterBuilder> IcebergWriter<PositionDeleteInput> for SortPositionDeleteWriter<B> {
152+
async fn write(&mut self, input: PositionDeleteInput) -> Result<()> {
153+
if let Some(v) = self.cache.get_mut(&input.path) {
154+
v.push(input.offset);
155+
} else {
156+
self.cache
157+
.insert(input.path.to_string(), vec![input.offset]);
158+
}
159+
160+
if self.cache.len() >= self.cache_num {
161+
self.write_cache_out().await?;
162+
}
163+
Ok(())
164+
}
165+
166+
async fn close(&mut self) -> Result<Vec<DataFile>> {
167+
self.write_cache_out().await?;
168+
Ok(std::mem::take(&mut self.data_files))
169+
}
170+
}
171+
172+
#[cfg(test)]
173+
mod test {
174+
use arrow_array::{Int64Array, StringArray};
175+
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
176+
use parquet::file::properties::WriterProperties;
177+
use tempfile::TempDir;
178+
179+
use super::POSITION_DELETE_SCHEMA;
180+
use crate::io::FileIOBuilder;
181+
use crate::spec::{DataContentType, DataFileFormat, Struct};
182+
use crate::writer::base_writer::sort_position_delete_writer::{
183+
PositionDeleteInput, SortPositionDeleteWriterBuilder,
184+
};
185+
use crate::writer::file_writer::location_generator::test::MockLocationGenerator;
186+
use crate::writer::file_writer::location_generator::DefaultFileNameGenerator;
187+
use crate::writer::file_writer::ParquetWriterBuilder;
188+
use crate::writer::{IcebergWriter, IcebergWriterBuilder};
189+
use crate::Result;
190+
191+
#[tokio::test]
192+
async fn test_position_delete_writer() -> Result<()> {
193+
let temp_dir = TempDir::new().unwrap();
194+
let file_io = FileIOBuilder::new("memory").build().unwrap();
195+
let location_gen =
196+
MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string());
197+
let file_name_gen =
198+
DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
199+
200+
let pw = ParquetWriterBuilder::new(
201+
WriterProperties::builder().build(),
202+
POSITION_DELETE_SCHEMA.clone(),
203+
file_io.clone(),
204+
location_gen,
205+
file_name_gen,
206+
);
207+
let mut position_delete_writer = SortPositionDeleteWriterBuilder::new(pw, 10, None)
208+
.build()
209+
.await?;
210+
211+
// Write some position delete inputs
212+
let mut inputs = [
213+
PositionDeleteInput {
214+
path: "file2.parquet".to_string(),
215+
offset: 2,
216+
},
217+
PositionDeleteInput {
218+
path: "file2.parquet".to_string(),
219+
offset: 1,
220+
},
221+
PositionDeleteInput {
222+
path: "file2.parquet".to_string(),
223+
offset: 3,
224+
},
225+
PositionDeleteInput {
226+
path: "file3.parquet".to_string(),
227+
offset: 2,
228+
},
229+
PositionDeleteInput {
230+
path: "file1.parquet".to_string(),
231+
offset: 5,
232+
},
233+
PositionDeleteInput {
234+
path: "file1.parquet".to_string(),
235+
offset: 4,
236+
},
237+
PositionDeleteInput {
238+
path: "file1.parquet".to_string(),
239+
offset: 1,
240+
},
241+
];
242+
for input in inputs.iter() {
243+
position_delete_writer.write(input.clone()).await?;
244+
}
245+
246+
let data_files = position_delete_writer.close().await.unwrap();
247+
assert_eq!(data_files.len(), 1);
248+
assert_eq!(data_files[0].file_format, DataFileFormat::Parquet);
249+
assert_eq!(data_files[0].content, DataContentType::PositionDeletes);
250+
assert_eq!(data_files[0].partition, Struct::empty());
251+
252+
let parquet_file = file_io
253+
.new_input(&data_files[0].file_path)?
254+
.read()
255+
.await
256+
.unwrap();
257+
let builder = ParquetRecordBatchReaderBuilder::try_new(parquet_file).unwrap();
258+
let reader = builder.build().unwrap();
259+
let batches = reader.map(|x| x.unwrap()).collect::<Vec<_>>();
260+
261+
let path_column = batches[0]
262+
.column(0)
263+
.as_any()
264+
.downcast_ref::<StringArray>()
265+
.unwrap();
266+
let offset_column = batches[0]
267+
.column(1)
268+
.as_any()
269+
.downcast_ref::<Int64Array>()
270+
.unwrap();
271+
272+
inputs.sort_by(|a, b| a.path.cmp(&b.path).then_with(|| a.offset.cmp(&b.offset)));
273+
for (i, input) in inputs.iter().enumerate() {
274+
assert_eq!(path_column.value(i), input.path);
275+
assert_eq!(offset_column.value(i), input.offset);
276+
}
277+
278+
Ok(())
279+
}
280+
}

0 commit comments

Comments
 (0)