@@ -17,9 +17,12 @@ use std::collections::HashSet;
17
17
use std:: fmt:: Display ;
18
18
use std:: fmt:: Formatter ;
19
19
use std:: sync:: Arc ;
20
+ use std:: time:: Instant ;
20
21
21
22
use databend_common_base:: base:: GlobalUniqName ;
22
23
use databend_common_base:: base:: ProgressValues ;
24
+ use databend_common_base:: runtime:: profile:: Profile ;
25
+ use databend_common_base:: runtime:: profile:: ProfileStatisticsName ;
23
26
use databend_common_catalog:: table_context:: TableContext ;
24
27
use databend_common_exception:: Result ;
25
28
use databend_common_expression:: arrow:: deserialize_column;
@@ -35,8 +38,9 @@ use crate::spillers::spiller_buffer::SpillerBuffer;
35
38
pub enum SpillerType {
36
39
HashJoinBuild ,
37
40
HashJoinProbe ,
38
- OrderBy , /* Todo: Add more spillers type
39
- * Aggregation */
41
+ OrderBy ,
42
+ // Todo: Add more spillers type
43
+ // Aggregation
40
44
}
41
45
42
46
impl Display for SpillerType {
@@ -111,21 +115,31 @@ impl Spiller {
111
115
pub async fn read_spilled_file ( & self , file : & str ) -> Result < ( DataBlock , u64 ) > {
112
116
debug_assert ! ( self . columns_layout. contains_key( file) ) ;
113
117
let data = self . operator . read ( file) . await ?;
114
- let bytes = data. len ( ) as u64 ;
118
+ let bytes = data. len ( ) ;
115
119
116
120
let mut begin = 0 ;
121
+ let instant = Instant :: now ( ) ;
117
122
let mut columns = Vec :: with_capacity ( self . columns_layout . len ( ) ) ;
118
123
let columns_layout = self . columns_layout . get ( file) . unwrap ( ) ;
119
124
for column_layout in columns_layout. iter ( ) {
120
125
columns. push ( deserialize_column ( & data[ begin..begin + column_layout] ) . unwrap ( ) ) ;
121
126
begin += column_layout;
122
127
}
123
128
let block = DataBlock :: new_from_columns ( columns) ;
124
- Ok ( ( block, bytes) )
129
+
130
+ Profile :: record_usize_profile ( ProfileStatisticsName :: SpillReadCount , 1 ) ;
131
+ Profile :: record_usize_profile ( ProfileStatisticsName :: SpillReadBytes , bytes) ;
132
+ Profile :: record_usize_profile (
133
+ ProfileStatisticsName :: SpillReadTime ,
134
+ instant. elapsed ( ) . as_millis ( ) as usize ,
135
+ ) ;
136
+
137
+ Ok ( ( block, bytes as u64 ) )
125
138
}
126
139
127
140
/// Write a [`DataBlock`] to storage.
128
141
pub async fn spill_block ( & mut self , data : DataBlock ) -> Result < ( String , u64 ) > {
142
+ let instant = Instant :: now ( ) ;
129
143
let unique_name = GlobalUniqName :: unique ( ) ;
130
144
let location = format ! ( "{}/{}" , self . config. location_prefix, unique_name) ;
131
145
let mut write_bytes = 0 ;
@@ -155,6 +169,13 @@ impl Spiller {
155
169
}
156
170
writer. close ( ) . await ?;
157
171
172
+ Profile :: record_usize_profile ( ProfileStatisticsName :: SpillWriteCount , 1 ) ;
173
+ Profile :: record_usize_profile ( ProfileStatisticsName :: SpillWriteBytes , write_bytes as usize ) ;
174
+ Profile :: record_usize_profile (
175
+ ProfileStatisticsName :: SpillWriteTime ,
176
+ instant. elapsed ( ) . as_millis ( ) as usize ,
177
+ ) ;
178
+
158
179
Ok ( ( location, write_bytes) )
159
180
}
160
181
@@ -182,6 +203,7 @@ impl Spiller {
182
203
/// Read spilled data with partition id
183
204
pub async fn read_spilled_partition ( & self , p_id : & u8 ) -> Result < Vec < DataBlock > > {
184
205
debug_assert ! ( self . partition_location. contains_key( p_id) ) ;
206
+
185
207
let files = self . partition_location . get ( p_id) . unwrap ( ) . to_vec ( ) ;
186
208
let mut spilled_data = Vec :: with_capacity ( files. len ( ) ) ;
187
209
for file in files. iter ( ) {
@@ -190,6 +212,7 @@ impl Spiller {
190
212
spilled_data. push ( block) ;
191
213
}
192
214
}
215
+
193
216
Ok ( spilled_data)
194
217
}
195
218
0 commit comments