Skip to content

Commit 1f4d225

Browse files
ZENOTMEZENOTME
authored andcommitted
support merge append (#29)
Co-authored-by: ZENOTME <st810918843@gmail.com> Signed-off-by: xxchan <xxchan22f@gmail.com>
1 parent 82725fd commit 1f4d225

File tree

7 files changed

+705
-38
lines changed

7 files changed

+705
-38
lines changed

crates/iceberg/src/spec/manifest/entry.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,12 @@ impl ManifestEntry {
147147
pub fn data_file(&self) -> &DataFile {
148148
&self.data_file
149149
}
150+
151+
/// File sequence number indicating when the file was added. Inherited when null and status is 1 (added).
152+
#[inline]
153+
pub fn file_sequence_number(&self) -> Option<i64> {
154+
self.file_sequence_number
155+
}
150156
}
151157

152158
/// Used to track additions and deletions in ManifestEntry.

crates/iceberg/src/transaction/append.rs

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,22 @@ use uuid::Uuid;
2424
use crate::error::Result;
2525
use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation};
2626
use crate::transaction::snapshot::{
27-
DefaultManifestProcess, SnapshotProduceAction, SnapshotProduceOperation,
27+
DefaultManifestProcess, MergeManifestProcess, SnapshotProduceAction, SnapshotProduceOperation,
2828
};
2929
use crate::transaction::Transaction;
3030
use crate::writer::file_writer::ParquetWriter;
3131
use crate::{Error, ErrorKind};
3232

33+
/// Target size of manifest file when merging manifests.
34+
pub const MANIFEST_TARGET_SIZE_BYTES: &str = "commit.manifest.target-size-bytes";
35+
const MANIFEST_TARGET_SIZE_BYTES_DEFAULT: u32 = 8 * 1024 * 1024; // 8 MB
36+
/// Minimum number of manifests to merge.
37+
pub const MANIFEST_MIN_MERGE_COUNT: &str = "commit.manifest.min-count-to-merge";
38+
const MANIFEST_MIN_MERGE_COUNT_DEFAULT: u32 = 100;
39+
/// Whether allow to merge manifests.
40+
pub const MANIFEST_MERGE_ENABLED: &str = "commit.manifest-merge.enabled";
41+
const MANIFEST_MERGE_ENABLED_DEFAULT: bool = false;
42+
3343
/// FastAppendAction is a transaction action for fast append data files to the table.
3444
pub struct FastAppendAction<'a> {
3545
snapshot_produce_action: SnapshotProduceAction<'a>,
@@ -204,6 +214,84 @@ impl SnapshotProduceOperation for FastAppendOperation {
204214
}
205215
}
206216

217+
/// MergeAppendAction is a transaction action similar to fast append except that it will merge manifests
218+
/// based on the target size.
219+
pub struct MergeAppendAction<'a> {
220+
snapshot_produce_action: SnapshotProduceAction<'a>,
221+
target_size_bytes: u32,
222+
min_count_to_merge: u32,
223+
merge_enabled: bool,
224+
}
225+
226+
impl<'a> MergeAppendAction<'a> {
227+
#[allow(clippy::too_many_arguments)]
228+
pub(crate) fn new(
229+
tx: Transaction<'a>,
230+
snapshot_id: i64,
231+
commit_uuid: Uuid,
232+
key_metadata: Vec<u8>,
233+
snapshot_properties: HashMap<String, String>,
234+
) -> Result<Self> {
235+
let target_size_bytes: u32 = tx
236+
.current_table
237+
.metadata()
238+
.properties()
239+
.get(MANIFEST_TARGET_SIZE_BYTES)
240+
.and_then(|s| s.parse().ok())
241+
.unwrap_or(MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
242+
let min_count_to_merge: u32 = tx
243+
.current_table
244+
.metadata()
245+
.properties()
246+
.get(MANIFEST_MIN_MERGE_COUNT)
247+
.and_then(|s| s.parse().ok())
248+
.unwrap_or(MANIFEST_MIN_MERGE_COUNT_DEFAULT);
249+
let merge_enabled = tx
250+
.current_table
251+
.metadata()
252+
.properties()
253+
.get(MANIFEST_MERGE_ENABLED)
254+
.and_then(|s| s.parse().ok())
255+
.unwrap_or(MANIFEST_MERGE_ENABLED_DEFAULT);
256+
Ok(Self {
257+
snapshot_produce_action: SnapshotProduceAction::new(
258+
tx,
259+
snapshot_id,
260+
key_metadata,
261+
commit_uuid,
262+
snapshot_properties,
263+
)?,
264+
target_size_bytes,
265+
min_count_to_merge,
266+
merge_enabled,
267+
})
268+
}
269+
270+
/// Add data files to the snapshot.
271+
pub fn add_data_files(
272+
&mut self,
273+
data_files: impl IntoIterator<Item = DataFile>,
274+
) -> Result<&mut Self> {
275+
self.snapshot_produce_action.add_data_files(data_files)?;
276+
Ok(self)
277+
}
278+
279+
/// Finished building the action and apply it to the transaction.
280+
pub async fn apply(self) -> Result<Transaction<'a>> {
281+
if self.merge_enabled {
282+
let process =
283+
MergeManifestProcess::new(self.target_size_bytes, self.min_count_to_merge);
284+
self.snapshot_produce_action
285+
.apply(FastAppendOperation, process)
286+
.await
287+
} else {
288+
self.snapshot_produce_action
289+
.apply(FastAppendOperation, DefaultManifestProcess)
290+
.await
291+
}
292+
}
293+
}
294+
207295
#[cfg(test)]
208296
mod tests {
209297
use crate::scan::tests::TableTestFixture;

crates/iceberg/src/transaction/mod.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
//! This module contains transaction api.
1919
2020
mod append;
21-
pub mod remove_snapshots;
21+
mod remove_snapshots;
2222
mod snapshot;
2323
mod sort_order;
2424

@@ -27,13 +27,14 @@ use std::collections::HashMap;
2727
use std::mem::discriminant;
2828
use std::sync::Arc;
2929

30+
pub use append::{MANIFEST_MERGE_ENABLED, MANIFEST_MIN_MERGE_COUNT, MANIFEST_TARGET_SIZE_BYTES};
3031
use remove_snapshots::RemoveSnapshotAction;
3132
use uuid::Uuid;
3233

3334
use crate::error::Result;
3435
use crate::spec::FormatVersion;
3536
use crate::table::Table;
36-
use crate::transaction::append::FastAppendAction;
37+
use crate::transaction::append::{FastAppendAction, MergeAppendAction};
3738
use crate::transaction::sort_order::ReplaceSortOrderAction;
3839
use crate::TableUpdate::UpgradeFormatVersion;
3940
use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate};
@@ -184,6 +185,22 @@ impl<'a> Transaction<'a> {
184185
)
185186
}
186187

188+
/// Creates a merge append action.
189+
pub fn merge_append(
190+
self,
191+
commit_uuid: Option<Uuid>,
192+
key_metadata: Vec<u8>,
193+
) -> Result<MergeAppendAction<'a>> {
194+
let snapshot_id = self.generate_unique_snapshot_id();
195+
MergeAppendAction::new(
196+
self,
197+
snapshot_id,
198+
commit_uuid.unwrap_or_else(Uuid::now_v7),
199+
key_metadata,
200+
HashMap::new(),
201+
)
202+
}
203+
187204
/// Creates replace sort order action.
188205
pub fn replace_sort_order(self) -> ReplaceSortOrderAction<'a> {
189206
ReplaceSortOrderAction {

0 commit comments

Comments
 (0)