Skip to content

Commit 9946641

Browse files
committed
refactor: scan plan now pure streams rather than channels, eliminating a source of deadlock
1 parent a3bf829 commit 9946641

File tree

6 files changed

+190
-322
lines changed

6 files changed

+190
-322
lines changed

Cargo.lock

Lines changed: 26 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ arrow-select = { version = "55" }
5252
arrow-string = { version = "55" }
5353
as-any = "0.3.2"
5454
async-std = "1.12"
55+
async-stream = "0.3"
5556
async-trait = "0.1.88"
5657
aws-config = "1.6.1"
5758
aws-sdk-glue = "1.39"

crates/iceberg/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ arrow-select = { workspace = true }
5656
arrow-string = { workspace = true }
5757
as-any = { workspace = true }
5858
async-std = { workspace = true, optional = true, features = ["attributes"] }
59+
async-stream = { workspace = true }
5960
async-trait = { workspace = true }
6061
base64 = { workspace = true }
6162
bimap = { workspace = true }

crates/iceberg/src/delete_file_index.rs

Lines changed: 18 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -16,131 +16,45 @@
1616
// under the License.
1717

1818
use std::collections::HashMap;
19-
use std::ops::Deref;
20-
use std::sync::{Arc, RwLock};
19+
use std::sync::Arc;
2120

22-
use futures::StreamExt;
23-
use futures::channel::mpsc::{Sender, channel};
24-
use tokio::sync::Notify;
25-
26-
use crate::runtime::spawn;
2721
use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile};
2822
use crate::spec::{DataContentType, DataFile, Struct};
2923

3024
/// Index of delete files
31-
#[derive(Debug, Clone)]
25+
#[derive(Debug, Default)]
3226
pub(crate) struct DeleteFileIndex {
33-
state: Arc<RwLock<DeleteFileIndexState>>,
34-
}
35-
36-
#[derive(Debug)]
37-
enum DeleteFileIndexState {
38-
Populating(Arc<Notify>),
39-
Populated(PopulatedDeleteFileIndex),
40-
}
41-
42-
#[derive(Debug)]
43-
struct PopulatedDeleteFileIndex {
4427
#[allow(dead_code)]
4528
global_deletes: Vec<Arc<DeleteFileContext>>,
4629
eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
4730
pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
48-
// TODO: do we need this?
49-
// pos_deletes_by_path: HashMap<String, Vec<Arc<DeleteFileContext>>>,
50-
5131
// TODO: Deletion Vector support
5232
}
5333

54-
impl DeleteFileIndex {
55-
/// create a new `DeleteFileIndex` along with the sender that populates it with delete files
56-
pub(crate) fn new() -> (DeleteFileIndex, Sender<DeleteFileContext>) {
57-
// TODO: what should the channel limit be?
58-
let (tx, rx) = channel(10);
59-
let notify = Arc::new(Notify::new());
60-
let state = Arc::new(RwLock::new(DeleteFileIndexState::Populating(
61-
notify.clone(),
62-
)));
63-
let delete_file_stream = rx.boxed();
64-
65-
spawn({
66-
let state = state.clone();
67-
async move {
68-
let delete_files = delete_file_stream.collect::<Vec<_>>().await;
69-
70-
let populated_delete_file_index = PopulatedDeleteFileIndex::new(delete_files);
71-
72-
{
73-
let mut guard = state.write().unwrap();
74-
*guard = DeleteFileIndexState::Populated(populated_delete_file_index);
75-
}
76-
notify.notify_waiters();
77-
}
78-
});
79-
80-
(DeleteFileIndex { state }, tx)
81-
}
82-
83-
/// Gets all the delete files that apply to the specified data file.
84-
pub(crate) async fn get_deletes_for_data_file(
85-
&self,
86-
data_file: &DataFile,
87-
seq_num: Option<i64>,
88-
) -> Vec<FileScanTaskDeleteFile> {
89-
let notifier = {
90-
let guard = self.state.read().unwrap();
91-
match *guard {
92-
DeleteFileIndexState::Populating(ref notifier) => notifier.clone(),
93-
DeleteFileIndexState::Populated(ref index) => {
94-
return index.get_deletes_for_data_file(data_file, seq_num);
95-
}
96-
}
97-
};
98-
99-
notifier.notified().await;
100-
101-
let guard = self.state.read().unwrap();
102-
match guard.deref() {
103-
DeleteFileIndexState::Populated(index) => {
104-
index.get_deletes_for_data_file(data_file, seq_num)
105-
}
106-
_ => unreachable!("Cannot be any other state than loaded"),
107-
}
108-
}
109-
}
110-
111-
impl PopulatedDeleteFileIndex {
112-
/// Creates a new populated delete file index from a list of delete file contexts, which
113-
/// allows for fast lookup when determining which delete files apply to a given data file.
114-
///
115-
/// 1. The partition information is extracted from each delete file's manifest entry.
116-
/// 2. If the partition is empty and the delete file is not a positional delete,
117-
/// it is added to the `global_deletes` vector
118-
/// 3. Otherwise, the delete file is added to one of two hash maps based on its content type.
119-
fn new(files: Vec<DeleteFileContext>) -> PopulatedDeleteFileIndex {
120-
let mut eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>> =
121-
HashMap::default();
122-
let mut pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>> =
123-
HashMap::default();
124-
125-
let mut global_deletes: Vec<Arc<DeleteFileContext>> = vec![];
126-
127-
files.into_iter().for_each(|ctx| {
34+
impl Extend<DeleteFileContext> for DeleteFileIndex {
35+
fn extend<T: IntoIterator<Item = DeleteFileContext>>(&mut self, iter: T) {
36+
// 1. The partition information is extracted from each delete file's manifest entry.
37+
// 2. If the partition is empty and the delete file is not a positional delete,
38+
// it is added to the `global_deletes` vector
39+
// 3. Otherwise, the delete file is added to one of two hash maps based on its content type.
40+
for ctx in iter {
12841
let arc_ctx = Arc::new(ctx);
12942

13043
let partition = arc_ctx.manifest_entry.data_file().partition();
13144

132-
// The spec states that "Equality delete files stored with an unpartitioned spec are applied as global deletes".
45+
// The spec states that "Equality delete files stored with an unpartitioned spec
46+
// are applied as global deletes".
13347
if partition.fields().is_empty() {
13448
// TODO: confirm we're good to skip here if we encounter a pos del
13549
if arc_ctx.manifest_entry.content_type() != DataContentType::PositionDeletes {
136-
global_deletes.push(arc_ctx);
137-
return;
50+
self.global_deletes.push(arc_ctx);
51+
continue;
13852
}
13953
}
14054

14155
let destination_map = match arc_ctx.manifest_entry.content_type() {
142-
DataContentType::PositionDeletes => &mut pos_deletes_by_partition,
143-
DataContentType::EqualityDeletes => &mut eq_deletes_by_partition,
56+
DataContentType::PositionDeletes => &mut self.pos_deletes_by_partition,
57+
DataContentType::EqualityDeletes => &mut self.eq_deletes_by_partition,
14458
_ => unreachable!(),
14559
};
14660

@@ -150,17 +64,13 @@ impl PopulatedDeleteFileIndex {
15064
entry.push(arc_ctx.clone());
15165
})
15266
.or_insert(vec![arc_ctx.clone()]);
153-
});
154-
155-
PopulatedDeleteFileIndex {
156-
global_deletes,
157-
eq_deletes_by_partition,
158-
pos_deletes_by_partition,
15967
}
16068
}
69+
}
16170

71+
impl DeleteFileIndex {
16272
/// Determine all the delete files that apply to the provided `DataFile`.
163-
fn get_deletes_for_data_file(
73+
pub(crate) fn get_deletes_for_data_file(
16474
&self,
16575
data_file: &DataFile,
16676
seq_num: Option<i64>,

0 commit comments

Comments
 (0)