Skip to content

feat: support incremental scan between 2 snapshots #1470

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 163 additions & 24 deletions crates/iceberg/src/scan/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashSet;
use std::sync::Arc;

use futures::channel::mpsc::Sender;
use futures::{SinkExt, TryFutureExt};
use itertools::Itertools;

use crate::delete_file_index::DeleteFileIndex;
use crate::expr::{Bind, BoundPredicate, Predicate};
Expand All @@ -28,11 +30,12 @@ use crate::scan::{
PartitionFilterCache,
};
use crate::spec::{
ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList, SchemaRef, SnapshotRef,
TableMetadataRef,
DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList,
ManifestStatus, Operation, SchemaRef, SnapshotRef, TableMetadataRef,
};
use crate::{Error, ErrorKind, Result};

type ManifestEntryFilterFn = dyn Fn(&ManifestEntryRef) -> bool + Send + Sync;
/// Wraps a [`ManifestFile`] alongside the objects that are needed
/// to process it in a thread-safe manner
pub(crate) struct ManifestFileContext {
Expand All @@ -45,7 +48,11 @@ pub(crate) struct ManifestFileContext {
object_cache: Arc<ObjectCache>,
snapshot_schema: SchemaRef,
expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
delete_file_index: DeleteFileIndex,
delete_file_index: Option<DeleteFileIndex>,

/// filter manifest entries.
/// Used for different kind of scans, e.g., only scan newly added files without delete files.
filter_fn: Option<Arc<ManifestEntryFilterFn>>,
}

/// Wraps a [`ManifestEntryRef`] alongside the objects that are needed
Expand All @@ -58,7 +65,7 @@ pub(crate) struct ManifestEntryContext {
pub bound_predicates: Option<Arc<BoundPredicates>>,
pub partition_spec_id: i32,
pub snapshot_schema: SchemaRef,
pub delete_file_index: DeleteFileIndex,
pub delete_file_index: Option<DeleteFileIndex>,
}

impl ManifestFileContext {
Expand All @@ -74,12 +81,13 @@ impl ManifestFileContext {
mut sender,
expression_evaluator_cache,
delete_file_index,
..
filter_fn,
} = self;
let filter_fn = filter_fn.unwrap_or_else(|| Arc::new(|_| true));

let manifest = object_cache.get_manifest(&manifest_file).await?;

for manifest_entry in manifest.entries() {
for manifest_entry in manifest.entries().iter().filter(|e| filter_fn(e)) {
let manifest_entry_context = ManifestEntryContext {
// TODO: refactor to avoid the expensive ManifestEntry clone
manifest_entry: manifest_entry.clone(),
Expand All @@ -105,13 +113,16 @@ impl ManifestEntryContext {
/// consume this `ManifestEntryContext`, returning a `FileScanTask`
/// created from it
pub(crate) async fn into_file_scan_task(self) -> Result<FileScanTask> {
let deletes = self
.delete_file_index
.get_deletes_for_data_file(
self.manifest_entry.data_file(),
self.manifest_entry.sequence_number(),
)
.await;
let deletes = if let Some(delete_file_index) = self.delete_file_index {
delete_file_index
.get_deletes_for_data_file(
self.manifest_entry.data_file(),
self.manifest_entry.sequence_number(),
)
.await
} else {
vec![]
};

Ok(FileScanTask {
start: 0,
Expand Down Expand Up @@ -150,6 +161,11 @@ pub(crate) struct PlanContext {
pub partition_filter_cache: Arc<PartitionFilterCache>,
pub manifest_evaluator_cache: Arc<ManifestEvaluatorCache>,
pub expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,

// for incremental scan.
// If `to_snapshot_id` is set, it means incremental scan. `from_snapshot_id` can be `None`.
pub from_snapshot_id: Option<i64>,
pub to_snapshot_id: Option<i64>,
}

impl PlanContext {
Expand Down Expand Up @@ -181,23 +197,89 @@ impl PlanContext {
Ok(partition_filter)
}

pub(crate) fn build_manifest_file_contexts(
pub(crate) async fn build_manifest_file_contexts(
&self,
manifest_list: Arc<ManifestList>,
tx_data: Sender<ManifestEntryContext>,
delete_file_idx: DeleteFileIndex,
delete_file_tx: Sender<ManifestEntryContext>,
delete_file_idx_and_tx: Option<(DeleteFileIndex, Sender<ManifestEntryContext>)>,
) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>> + 'static>> {
let manifest_files = manifest_list.entries().iter();
let mut filter_fn: Option<Arc<ManifestEntryFilterFn>> = None;
let manifest_files = {
if let Some(to_snapshot_id) = self.to_snapshot_id {
// Incremental scan mode:
// Get all added files between two snapshots.
// - data files in `Append` and `Overwrite` snapshots are included.
// - delete files are ignored
// - `Replace` snapshots (e.g., compaction) are ignored.
//
// `latest_snapshot_id` is inclusive, `oldest_snapshot_id` is exclusive.

// prevent misuse
assert!(
delete_file_idx_and_tx.is_none(),
"delete file is not supported in incremental scan mode"
);

let snapshots =
ancestors_between(&self.table_metadata, to_snapshot_id, self.from_snapshot_id)
.filter(|snapshot| {
matches!(
snapshot.summary().operation,
Operation::Append | Operation::Overwrite
)
})
.collect_vec();
let snapshot_ids: HashSet<i64> = snapshots
.iter()
.map(|snapshot| snapshot.snapshot_id())
.collect();

let mut manifest_files = vec![];
for snapshot in snapshots {
let manifest_list = self
.object_cache
.get_manifest_list(&snapshot, &self.table_metadata)
.await?;
for entry in manifest_list.entries() {
if !snapshot_ids.contains(&entry.added_snapshot_id) {
continue;
}
manifest_files.push(entry.clone());
}
}

filter_fn = Some(Arc::new(move |entry: &ManifestEntryRef| {
matches!(entry.status(), ManifestStatus::Added)
&& matches!(entry.data_file().content_type(), DataContentType::Data)
&& (
entry
.snapshot_id()
.map(|id| snapshot_ids.contains(&id))
.unwrap_or(true)
// Include entries without snapshot_id
)
}));

manifest_files
} else {
let manifest_list = self.get_manifest_list().await?;
manifest_list.entries().to_vec()
}
};

// TODO: Ideally we could ditch this intermediate Vec as we return an iterator.
let mut filtered_mfcs = vec![];

for manifest_file in manifest_files {
let tx = if manifest_file.content == ManifestContentType::Deletes {
delete_file_tx.clone()
for manifest_file in &manifest_files {
let (delete_file_idx, tx) = if manifest_file.content == ManifestContentType::Deletes {
let Some((delete_file_idx, tx)) = delete_file_idx_and_tx.as_ref() else {
continue;
};
(Some(delete_file_idx.clone()), tx.clone())
} else {
tx_data.clone()
(
delete_file_idx_and_tx.as_ref().map(|(idx, _)| idx.clone()),
tx_data.clone(),
)
};

let partition_bound_predicate = if self.predicate.is_some() {
Expand Down Expand Up @@ -225,7 +307,8 @@ impl PlanContext {
manifest_file,
partition_bound_predicate,
tx,
delete_file_idx.clone(),
delete_file_idx,
filter_fn.clone(),
);

filtered_mfcs.push(Ok(mfc));
Expand All @@ -239,7 +322,8 @@ impl PlanContext {
manifest_file: &ManifestFile,
partition_filter: Option<Arc<BoundPredicate>>,
sender: Sender<ManifestEntryContext>,
delete_file_index: DeleteFileIndex,
delete_file_index: Option<DeleteFileIndex>,
filter_fn: Option<Arc<ManifestEntryFilterFn>>,
) -> ManifestFileContext {
let bound_predicates =
if let (Some(ref partition_bound_predicate), Some(snapshot_bound_predicate)) =
Expand All @@ -262,6 +346,61 @@ impl PlanContext {
field_ids: self.field_ids.clone(),
expression_evaluator_cache: self.expression_evaluator_cache.clone(),
delete_file_index,
filter_fn,
}
}
}

struct Ancestors {
next: Option<SnapshotRef>,
get_snapshot: Box<dyn Fn(i64) -> Option<SnapshotRef> + Send>,
}

impl Iterator for Ancestors {
type Item = SnapshotRef;

fn next(&mut self) -> Option<Self::Item> {
let snapshot = self.next.take()?;
let result = snapshot.clone();
self.next = snapshot
.parent_snapshot_id()
.and_then(|id| (self.get_snapshot)(id));
Some(result)
}
}

/// Iterate starting from `snapshot` (inclusive) to the root snapshot.
fn ancestors_of(
table_metadata: &TableMetadataRef,
snapshot: i64,
) -> Box<dyn Iterator<Item = SnapshotRef> + Send> {
if let Some(snapshot) = table_metadata.snapshot_by_id(snapshot) {
let table_metadata = table_metadata.clone();
Box::new(Ancestors {
next: Some(snapshot.clone()),
get_snapshot: Box::new(move |id| table_metadata.snapshot_by_id(id).cloned()),
})
} else {
Box::new(std::iter::empty())
}
}

/// Iterate starting from `snapshot` (inclusive) to `oldest_snapshot_id` (exclusive).
fn ancestors_between(
table_metadata: &TableMetadataRef,
latest_snapshot_id: i64,
oldest_snapshot_id: Option<i64>,
) -> Box<dyn Iterator<Item = SnapshotRef> + Send> {
let Some(oldest_snapshot_id) = oldest_snapshot_id else {
return Box::new(ancestors_of(table_metadata, latest_snapshot_id));
};

if latest_snapshot_id == oldest_snapshot_id {
return Box::new(std::iter::empty());
}

Box::new(
ancestors_of(table_metadata, latest_snapshot_id)
.take_while(move |snapshot| snapshot.snapshot_id() != oldest_snapshot_id),
)
}
Loading
Loading