Skip to content

refactor: TableScan file plan generation now implemented purely in streams rather than channels #1486

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ arrow-select = { version = "55" }
arrow-string = { version = "55" }
as-any = "0.3.2"
async-std = "1.12"
async-stream = "0.3"
async-trait = "0.1.88"
aws-config = "1.6.1"
aws-sdk-glue = "1.39"
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ arrow-select = { workspace = true }
arrow-string = { workspace = true }
as-any = { workspace = true }
async-std = { workspace = true, optional = true, features = ["attributes"] }
async-stream = { workspace = true }
async-trait = { workspace = true }
base64 = { workspace = true }
bimap = { workspace = true }
Expand Down
126 changes: 18 additions & 108 deletions crates/iceberg/src/delete_file_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,131 +16,45 @@
// under the License.

use std::collections::HashMap;
use std::ops::Deref;
use std::sync::{Arc, RwLock};
use std::sync::Arc;

use futures::StreamExt;
use futures::channel::mpsc::{Sender, channel};
use tokio::sync::Notify;

use crate::runtime::spawn;
use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile};
use crate::spec::{DataContentType, DataFile, Struct};

/// Index of delete files
#[derive(Debug, Clone)]
#[derive(Debug, Default)]
pub(crate) struct DeleteFileIndex {
state: Arc<RwLock<DeleteFileIndexState>>,
}

#[derive(Debug)]
enum DeleteFileIndexState {
Populating(Arc<Notify>),
Populated(PopulatedDeleteFileIndex),
}

#[derive(Debug)]
struct PopulatedDeleteFileIndex {
#[allow(dead_code)]
global_deletes: Vec<Arc<DeleteFileContext>>,
eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
// TODO: do we need this?
// pos_deletes_by_path: HashMap<String, Vec<Arc<DeleteFileContext>>>,

// TODO: Deletion Vector support
}

impl DeleteFileIndex {
/// create a new `DeleteFileIndex` along with the sender that populates it with delete files
pub(crate) fn new() -> (DeleteFileIndex, Sender<DeleteFileContext>) {
// TODO: what should the channel limit be?
let (tx, rx) = channel(10);
let notify = Arc::new(Notify::new());
let state = Arc::new(RwLock::new(DeleteFileIndexState::Populating(
notify.clone(),
)));
let delete_file_stream = rx.boxed();

spawn({
let state = state.clone();
async move {
let delete_files = delete_file_stream.collect::<Vec<_>>().await;

let populated_delete_file_index = PopulatedDeleteFileIndex::new(delete_files);

{
let mut guard = state.write().unwrap();
*guard = DeleteFileIndexState::Populated(populated_delete_file_index);
}
notify.notify_waiters();
}
});

(DeleteFileIndex { state }, tx)
}

/// Gets all the delete files that apply to the specified data file.
pub(crate) async fn get_deletes_for_data_file(
&self,
data_file: &DataFile,
seq_num: Option<i64>,
) -> Vec<FileScanTaskDeleteFile> {
let notifier = {
let guard = self.state.read().unwrap();
match *guard {
DeleteFileIndexState::Populating(ref notifier) => notifier.clone(),
DeleteFileIndexState::Populated(ref index) => {
return index.get_deletes_for_data_file(data_file, seq_num);
}
}
};

notifier.notified().await;

let guard = self.state.read().unwrap();
match guard.deref() {
DeleteFileIndexState::Populated(index) => {
index.get_deletes_for_data_file(data_file, seq_num)
}
_ => unreachable!("Cannot be any other state than loaded"),
}
}
}

impl PopulatedDeleteFileIndex {
/// Creates a new populated delete file index from a list of delete file contexts, which
/// allows for fast lookup when determining which delete files apply to a given data file.
///
/// 1. The partition information is extracted from each delete file's manifest entry.
/// 2. If the partition is empty and the delete file is not a positional delete,
/// it is added to the `global_deletes` vector
/// 3. Otherwise, the delete file is added to one of two hash maps based on its content type.
fn new(files: Vec<DeleteFileContext>) -> PopulatedDeleteFileIndex {
let mut eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>> =
HashMap::default();
let mut pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>> =
HashMap::default();

let mut global_deletes: Vec<Arc<DeleteFileContext>> = vec![];

files.into_iter().for_each(|ctx| {
impl Extend<DeleteFileContext> for DeleteFileIndex {
fn extend<T: IntoIterator<Item = DeleteFileContext>>(&mut self, iter: T) {
// 1. The partition information is extracted from each delete file's manifest entry.
// 2. If the partition is empty and the delete file is not a positional delete,
// it is added to the `global_deletes` vector
// 3. Otherwise, the delete file is added to one of two hash maps based on its content type.
for ctx in iter {
let arc_ctx = Arc::new(ctx);

let partition = arc_ctx.manifest_entry.data_file().partition();

// The spec states that "Equality delete files stored with an unpartitioned spec are applied as global deletes".
// The spec states that "Equality delete files stored with an unpartitioned spec
// are applied as global deletes".
if partition.fields().is_empty() {
// TODO: confirm we're good to skip here if we encounter a pos del
if arc_ctx.manifest_entry.content_type() != DataContentType::PositionDeletes {
global_deletes.push(arc_ctx);
return;
self.global_deletes.push(arc_ctx);
continue;
}
}

let destination_map = match arc_ctx.manifest_entry.content_type() {
DataContentType::PositionDeletes => &mut pos_deletes_by_partition,
DataContentType::EqualityDeletes => &mut eq_deletes_by_partition,
DataContentType::PositionDeletes => &mut self.pos_deletes_by_partition,
DataContentType::EqualityDeletes => &mut self.eq_deletes_by_partition,
_ => unreachable!(),
};

Expand All @@ -150,17 +64,13 @@ impl PopulatedDeleteFileIndex {
entry.push(arc_ctx.clone());
})
.or_insert(vec![arc_ctx.clone()]);
});

PopulatedDeleteFileIndex {
global_deletes,
eq_deletes_by_partition,
pos_deletes_by_partition,
}
}
}

impl DeleteFileIndex {
/// Determine all the delete files that apply to the provided `DataFile`.
fn get_deletes_for_data_file(
pub(crate) fn get_deletes_for_data_file(
&self,
data_file: &DataFile,
seq_num: Option<i64>,
Expand Down
Loading
Loading