Skip to content

Commit 7d794fa

Browse files
authored
Scan Delete Support Part 4: Delete File Loading; Skeleton for Processing (#982)
Extends the `DeleteFileManager` introduced in #950 To include loading of delete files, storage and retrieval of parsed delete files from shared state, and the outline for how parsing will connect up to this new work. Issue: #630
1 parent a28a574 commit 7d794fa

File tree

12 files changed

+1079
-248
lines changed

12 files changed

+1079
-248
lines changed

Cargo.lock

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 324 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,324 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::HashMap;
19+
20+
use futures::{StreamExt, TryStreamExt};
21+
use tokio::sync::oneshot::{Receiver, channel};
22+
23+
use super::delete_filter::DeleteFilter;
24+
use crate::arrow::delete_file_loader::BasicDeleteFileLoader;
25+
use crate::delete_vector::DeleteVector;
26+
use crate::expr::Predicate;
27+
use crate::io::FileIO;
28+
use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
29+
use crate::spec::{DataContentType, SchemaRef};
30+
use crate::{Error, ErrorKind, Result};
31+
32+
#[derive(Clone, Debug)]
33+
pub(crate) struct CachingDeleteFileLoader {
34+
basic_delete_file_loader: BasicDeleteFileLoader,
35+
concurrency_limit_data_files: usize,
36+
}
37+
38+
// Intermediate context during processing of a delete file task.
39+
enum DeleteFileContext {
40+
// TODO: Delete Vector loader from Puffin files
41+
ExistingEqDel,
42+
PosDels(ArrowRecordBatchStream),
43+
FreshEqDel {
44+
batch_stream: ArrowRecordBatchStream,
45+
sender: tokio::sync::oneshot::Sender<Predicate>,
46+
},
47+
}
48+
49+
// Final result of the processing of a delete file task before
50+
// results are fully merged into the DeleteFileManager's state
51+
enum ParsedDeleteFileContext {
52+
DelVecs(HashMap<String, DeleteVector>),
53+
EqDel,
54+
}
55+
56+
#[allow(unused_variables)]
57+
impl CachingDeleteFileLoader {
58+
pub(crate) fn new(file_io: FileIO, concurrency_limit_data_files: usize) -> Self {
59+
CachingDeleteFileLoader {
60+
basic_delete_file_loader: BasicDeleteFileLoader::new(file_io),
61+
concurrency_limit_data_files,
62+
}
63+
}
64+
65+
/// Initiates loading of all deletes for all the specified tasks
66+
///
67+
/// Returned future completes once all positional deletes and delete vectors
68+
/// have loaded. EQ deletes are not waited for in this method but the returned
69+
/// DeleteFilter will await their loading when queried for them.
70+
///
71+
/// * Create a single stream of all delete file tasks irrespective of type,
72+
/// so that we can respect the combined concurrency limit
73+
/// * We then process each in two phases: load and parse.
74+
/// * for positional deletes the load phase instantiates an ArrowRecordBatchStream to
75+
/// stream the file contents out
76+
/// * for eq deletes, we first check if the EQ delete is already loaded or being loaded by
77+
/// another concurrently processing data file scan task. If it is, we skip it.
78+
/// If not, the DeleteFilter is updated to contain a notifier to prevent other data file
79+
/// tasks from starting to load the same equality delete file. We spawn a task to load
80+
/// the EQ delete's record batch stream, convert it to a predicate, update the delete filter,
81+
/// and notify any task that was waiting for it.
82+
/// * When this gets updated to add support for delete vectors, the load phase will return
83+
/// a PuffinReader for them.
84+
/// * The parse phase parses each record batch stream according to its associated data type.
85+
/// The result of this is a map of data file paths to delete vectors for the positional
86+
/// delete tasks (and in future for the delete vector tasks). For equality delete
87+
/// file tasks, this results in an unbound Predicate.
88+
/// * The unbound Predicates resulting from equality deletes are sent to their associated oneshot
89+
/// channel to store them in the right place in the delete file managers state.
90+
/// * The results of all of these futures are awaited on in parallel with the specified
91+
/// level of concurrency and collected into a vec. We then combine all the delete
92+
/// vector maps that resulted from any positional delete or delete vector files into a
93+
/// single map and persist it in the state.
94+
///
95+
///
96+
/// Conceptually, the data flow is like this:
97+
/// ```none
98+
/// FileScanTaskDeleteFile
99+
/// |
100+
/// Skip Started EQ Deletes
101+
/// |
102+
/// |
103+
/// [load recordbatch stream / puffin]
104+
/// DeleteFileContext
105+
/// |
106+
/// |
107+
/// +-----------------------------+--------------------------+
108+
/// Pos Del Del Vec (Not yet Implemented) EQ Del
109+
/// | | |
110+
/// [parse pos del stream] [parse del vec puffin] [parse eq del]
111+
/// HashMap<String, RoaringTreeMap> HashMap<String, RoaringTreeMap> (Predicate, Sender)
112+
/// | | |
113+
/// | | [persist to state]
114+
/// | | ()
115+
/// | | |
116+
/// +-----------------------------+--------------------------+
117+
/// |
118+
/// [buffer unordered]
119+
/// |
120+
/// [combine del vectors]
121+
/// HashMap<String, RoaringTreeMap>
122+
/// |
123+
/// [persist del vectors to state]
124+
/// ()
125+
/// |
126+
/// |
127+
/// [join!]
128+
/// ```
129+
pub(crate) fn load_deletes(
130+
&self,
131+
delete_file_entries: &[FileScanTaskDeleteFile],
132+
schema: SchemaRef,
133+
) -> Receiver<Result<DeleteFilter>> {
134+
let (tx, rx) = channel();
135+
let del_filter = DeleteFilter::default();
136+
137+
let stream_items = delete_file_entries
138+
.iter()
139+
.map(|t| {
140+
(
141+
t.clone(),
142+
self.basic_delete_file_loader.clone(),
143+
del_filter.clone(),
144+
schema.clone(),
145+
)
146+
})
147+
.collect::<Vec<_>>();
148+
let task_stream = futures::stream::iter(stream_items);
149+
150+
let del_filter = del_filter.clone();
151+
let concurrency_limit_data_files = self.concurrency_limit_data_files;
152+
let basic_delete_file_loader = self.basic_delete_file_loader.clone();
153+
crate::runtime::spawn(async move {
154+
let result = async move {
155+
let mut del_filter = del_filter;
156+
let basic_delete_file_loader = basic_delete_file_loader.clone();
157+
158+
let results: Vec<ParsedDeleteFileContext> = task_stream
159+
.map(move |(task, file_io, del_filter, schema)| {
160+
let basic_delete_file_loader = basic_delete_file_loader.clone();
161+
async move {
162+
Self::load_file_for_task(
163+
&task,
164+
basic_delete_file_loader.clone(),
165+
del_filter,
166+
schema,
167+
)
168+
.await
169+
}
170+
})
171+
.map(move |ctx| {
172+
Ok(async { Self::parse_file_content_for_task(ctx.await?).await })
173+
})
174+
.try_buffer_unordered(concurrency_limit_data_files)
175+
.try_collect::<Vec<_>>()
176+
.await?;
177+
178+
for item in results {
179+
if let ParsedDeleteFileContext::DelVecs(hash_map) = item {
180+
for (data_file_path, delete_vector) in hash_map.into_iter() {
181+
del_filter.upsert_delete_vector(data_file_path, delete_vector);
182+
}
183+
}
184+
}
185+
186+
Ok(del_filter)
187+
}
188+
.await;
189+
190+
let _ = tx.send(result);
191+
});
192+
193+
rx
194+
}
195+
196+
async fn load_file_for_task(
197+
task: &FileScanTaskDeleteFile,
198+
basic_delete_file_loader: BasicDeleteFileLoader,
199+
del_filter: DeleteFilter,
200+
schema: SchemaRef,
201+
) -> Result<DeleteFileContext> {
202+
match task.file_type {
203+
DataContentType::PositionDeletes => Ok(DeleteFileContext::PosDels(
204+
basic_delete_file_loader
205+
.parquet_to_batch_stream(&task.file_path)
206+
.await?,
207+
)),
208+
209+
DataContentType::EqualityDeletes => {
210+
let Some(notify) = del_filter.try_start_eq_del_load(&task.file_path) else {
211+
return Ok(DeleteFileContext::ExistingEqDel);
212+
};
213+
214+
let (sender, receiver) = channel();
215+
del_filter.insert_equality_delete(&task.file_path, receiver);
216+
217+
Ok(DeleteFileContext::FreshEqDel {
218+
batch_stream: BasicDeleteFileLoader::evolve_schema(
219+
basic_delete_file_loader
220+
.parquet_to_batch_stream(&task.file_path)
221+
.await?,
222+
schema,
223+
)
224+
.await?,
225+
sender,
226+
})
227+
}
228+
229+
DataContentType::Data => Err(Error::new(
230+
ErrorKind::Unexpected,
231+
"tasks with files of type Data not expected here",
232+
)),
233+
}
234+
}
235+
236+
async fn parse_file_content_for_task(
237+
ctx: DeleteFileContext,
238+
) -> Result<ParsedDeleteFileContext> {
239+
match ctx {
240+
DeleteFileContext::ExistingEqDel => Ok(ParsedDeleteFileContext::EqDel),
241+
DeleteFileContext::PosDels(batch_stream) => {
242+
let del_vecs =
243+
Self::parse_positional_deletes_record_batch_stream(batch_stream).await?;
244+
Ok(ParsedDeleteFileContext::DelVecs(del_vecs))
245+
}
246+
DeleteFileContext::FreshEqDel {
247+
sender,
248+
batch_stream,
249+
} => {
250+
let predicate =
251+
Self::parse_equality_deletes_record_batch_stream(batch_stream).await?;
252+
253+
sender
254+
.send(predicate)
255+
.map_err(|err| {
256+
Error::new(
257+
ErrorKind::Unexpected,
258+
"Could not send eq delete predicate to state",
259+
)
260+
})
261+
.map(|_| ParsedDeleteFileContext::EqDel)
262+
}
263+
}
264+
}
265+
266+
/// Parses a record batch stream coming from positional delete files
267+
///
268+
/// Returns a map of data file path to a delete vector
269+
async fn parse_positional_deletes_record_batch_stream(
270+
stream: ArrowRecordBatchStream,
271+
) -> Result<HashMap<String, DeleteVector>> {
272+
// TODO
273+
274+
Err(Error::new(
275+
ErrorKind::FeatureUnsupported,
276+
"parsing of positional deletes is not yet supported",
277+
))
278+
}
279+
280+
/// Parses record batch streams from individual equality delete files
281+
///
282+
/// Returns an unbound Predicate for each batch stream
283+
async fn parse_equality_deletes_record_batch_stream(
284+
streams: ArrowRecordBatchStream,
285+
) -> Result<Predicate> {
286+
// TODO
287+
288+
Err(Error::new(
289+
ErrorKind::FeatureUnsupported,
290+
"parsing of equality deletes is not yet supported",
291+
))
292+
}
293+
}
294+
295+
#[cfg(test)]
296+
mod tests {
297+
use tempfile::TempDir;
298+
299+
use super::*;
300+
use crate::arrow::delete_file_loader::tests::setup;
301+
302+
#[tokio::test]
303+
async fn test_delete_file_manager_load_deletes() {
304+
let tmp_dir = TempDir::new().unwrap();
305+
let table_location = tmp_dir.path();
306+
let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
307+
.unwrap()
308+
.build()
309+
.unwrap();
310+
311+
// Note that with the delete file parsing not yet in place, all we can test here is that
312+
// the call to the loader fails with the expected FeatureUnsupportedError.
313+
let delete_file_manager = CachingDeleteFileLoader::new(file_io.clone(), 10);
314+
315+
let file_scan_tasks = setup(table_location);
316+
317+
let result = delete_file_manager
318+
.load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref())
319+
.await
320+
.unwrap();
321+
322+
assert!(result.is_err_and(|e| e.kind() == ErrorKind::FeatureUnsupported));
323+
}
324+
}

0 commit comments

Comments
 (0)