16
16
// under the License.
17
17
18
18
use std:: collections:: HashMap ;
19
- use std:: ops:: Deref ;
20
- use std:: sync:: { Arc , RwLock } ;
19
+ use std:: sync:: Arc ;
21
20
22
- use futures:: StreamExt ;
23
- use futures:: channel:: mpsc:: { Sender , channel} ;
24
- use tokio:: sync:: Notify ;
25
-
26
- use crate :: runtime:: spawn;
27
21
use crate :: scan:: { DeleteFileContext , FileScanTaskDeleteFile } ;
28
22
use crate :: spec:: { DataContentType , DataFile , Struct } ;
29
23
30
24
/// Index of delete files
31
- #[ derive( Debug , Clone ) ]
25
+ #[ derive( Debug , Default ) ]
32
26
pub ( crate ) struct DeleteFileIndex {
33
- state : Arc < RwLock < DeleteFileIndexState > > ,
34
- }
35
-
36
- #[ derive( Debug ) ]
37
- enum DeleteFileIndexState {
38
- Populating ( Arc < Notify > ) ,
39
- Populated ( PopulatedDeleteFileIndex ) ,
40
- }
41
-
42
- #[ derive( Debug ) ]
43
- struct PopulatedDeleteFileIndex {
44
27
#[ allow( dead_code) ]
45
28
global_deletes : Vec < Arc < DeleteFileContext > > ,
46
29
eq_deletes_by_partition : HashMap < Struct , Vec < Arc < DeleteFileContext > > > ,
47
30
pos_deletes_by_partition : HashMap < Struct , Vec < Arc < DeleteFileContext > > > ,
48
- // TODO: do we need this?
49
- // pos_deletes_by_path: HashMap<String, Vec<Arc<DeleteFileContext>>>,
50
-
51
31
// TODO: Deletion Vector support
52
32
}
53
33
54
- impl DeleteFileIndex {
55
- /// create a new `DeleteFileIndex` along with the sender that populates it with delete files
56
- pub ( crate ) fn new ( ) -> ( DeleteFileIndex , Sender < DeleteFileContext > ) {
57
- // TODO: what should the channel limit be?
58
- let ( tx, rx) = channel ( 10 ) ;
59
- let notify = Arc :: new ( Notify :: new ( ) ) ;
60
- let state = Arc :: new ( RwLock :: new ( DeleteFileIndexState :: Populating (
61
- notify. clone ( ) ,
62
- ) ) ) ;
63
- let delete_file_stream = rx. boxed ( ) ;
64
-
65
- spawn ( {
66
- let state = state. clone ( ) ;
67
- async move {
68
- let delete_files = delete_file_stream. collect :: < Vec < _ > > ( ) . await ;
69
-
70
- let populated_delete_file_index = PopulatedDeleteFileIndex :: new ( delete_files) ;
71
-
72
- {
73
- let mut guard = state. write ( ) . unwrap ( ) ;
74
- * guard = DeleteFileIndexState :: Populated ( populated_delete_file_index) ;
75
- }
76
- notify. notify_waiters ( ) ;
77
- }
78
- } ) ;
79
-
80
- ( DeleteFileIndex { state } , tx)
81
- }
82
-
83
- /// Gets all the delete files that apply to the specified data file.
84
- pub ( crate ) async fn get_deletes_for_data_file (
85
- & self ,
86
- data_file : & DataFile ,
87
- seq_num : Option < i64 > ,
88
- ) -> Vec < FileScanTaskDeleteFile > {
89
- let notifier = {
90
- let guard = self . state . read ( ) . unwrap ( ) ;
91
- match * guard {
92
- DeleteFileIndexState :: Populating ( ref notifier) => notifier. clone ( ) ,
93
- DeleteFileIndexState :: Populated ( ref index) => {
94
- return index. get_deletes_for_data_file ( data_file, seq_num) ;
95
- }
96
- }
97
- } ;
98
-
99
- notifier. notified ( ) . await ;
100
-
101
- let guard = self . state . read ( ) . unwrap ( ) ;
102
- match guard. deref ( ) {
103
- DeleteFileIndexState :: Populated ( index) => {
104
- index. get_deletes_for_data_file ( data_file, seq_num)
105
- }
106
- _ => unreachable ! ( "Cannot be any other state than loaded" ) ,
107
- }
108
- }
109
- }
110
-
111
- impl PopulatedDeleteFileIndex {
112
- /// Creates a new populated delete file index from a list of delete file contexts, which
113
- /// allows for fast lookup when determining which delete files apply to a given data file.
114
- ///
115
- /// 1. The partition information is extracted from each delete file's manifest entry.
116
- /// 2. If the partition is empty and the delete file is not a positional delete,
117
- /// it is added to the `global_deletes` vector
118
- /// 3. Otherwise, the delete file is added to one of two hash maps based on its content type.
119
- fn new ( files : Vec < DeleteFileContext > ) -> PopulatedDeleteFileIndex {
120
- let mut eq_deletes_by_partition: HashMap < Struct , Vec < Arc < DeleteFileContext > > > =
121
- HashMap :: default ( ) ;
122
- let mut pos_deletes_by_partition: HashMap < Struct , Vec < Arc < DeleteFileContext > > > =
123
- HashMap :: default ( ) ;
124
-
125
- let mut global_deletes: Vec < Arc < DeleteFileContext > > = vec ! [ ] ;
126
-
127
- files. into_iter ( ) . for_each ( |ctx| {
34
+ impl Extend < DeleteFileContext > for DeleteFileIndex {
35
+ fn extend < T : IntoIterator < Item = DeleteFileContext > > ( & mut self , iter : T ) {
36
+ // 1. The partition information is extracted from each delete file's manifest entry.
37
+ // 2. If the partition is empty and the delete file is not a positional delete,
38
+ // it is added to the `global_deletes` vector
39
+ // 3. Otherwise, the delete file is added to one of two hash maps based on its content type.
40
+ for ctx in iter {
128
41
let arc_ctx = Arc :: new ( ctx) ;
129
42
130
43
let partition = arc_ctx. manifest_entry . data_file ( ) . partition ( ) ;
131
44
132
- // The spec states that "Equality delete files stored with an unpartitioned spec are applied as global deletes".
45
+ // The spec states that "Equality delete files stored with an unpartitioned spec
46
+ // are applied as global deletes".
133
47
if partition. fields ( ) . is_empty ( ) {
134
48
// TODO: confirm we're good to skip here if we encounter a pos del
135
49
if arc_ctx. manifest_entry . content_type ( ) != DataContentType :: PositionDeletes {
136
- global_deletes. push ( arc_ctx) ;
137
- return ;
50
+ self . global_deletes . push ( arc_ctx) ;
51
+ continue ;
138
52
}
139
53
}
140
54
141
55
let destination_map = match arc_ctx. manifest_entry . content_type ( ) {
142
- DataContentType :: PositionDeletes => & mut pos_deletes_by_partition,
143
- DataContentType :: EqualityDeletes => & mut eq_deletes_by_partition,
56
+ DataContentType :: PositionDeletes => & mut self . pos_deletes_by_partition ,
57
+ DataContentType :: EqualityDeletes => & mut self . eq_deletes_by_partition ,
144
58
_ => unreachable ! ( ) ,
145
59
} ;
146
60
@@ -150,17 +64,13 @@ impl PopulatedDeleteFileIndex {
150
64
entry. push ( arc_ctx. clone ( ) ) ;
151
65
} )
152
66
. or_insert ( vec ! [ arc_ctx. clone( ) ] ) ;
153
- } ) ;
154
-
155
- PopulatedDeleteFileIndex {
156
- global_deletes,
157
- eq_deletes_by_partition,
158
- pos_deletes_by_partition,
159
67
}
160
68
}
69
+ }
161
70
71
+ impl DeleteFileIndex {
162
72
/// Determine all the delete files that apply to the provided `DataFile`.
163
- fn get_deletes_for_data_file (
73
+ pub ( crate ) fn get_deletes_for_data_file (
164
74
& self ,
165
75
data_file : & DataFile ,
166
76
seq_num : Option < i64 > ,
0 commit comments