@@ -30,6 +30,7 @@ use common_expression::types::ValueType;
30
30
use common_expression:: Column ;
31
31
use common_expression:: ColumnBuilder ;
32
32
use common_expression:: Scalar ;
33
+ use common_hashtable:: HashSet as CommonHashSet ;
33
34
use common_hashtable:: HashSetWithStackMemory ;
34
35
use common_hashtable:: HashTableEntity ;
35
36
use common_hashtable:: HashTableKeyable ;
@@ -64,9 +65,13 @@ pub struct AggregateDistinctNumberState<T: Number + HashTableKeyable> {
64
65
inserted : bool ,
65
66
}
66
67
68
+ const HOLDER_CAPACITY : usize = 256 ;
69
+ const HOLDER_BYTES_CAPACITY : usize = HOLDER_CAPACITY * 8 ;
70
+
67
71
pub struct AggregateDistinctStringState {
68
- set : HashSet < KeysRef , RandomState > ,
69
- holder : StringColumnBuilder ,
72
+ set : CommonHashSet < KeysRef > ,
73
+ inserted : bool ,
74
+ holders : Vec < StringColumnBuilder > ,
70
75
}
71
76
72
77
pub struct DataGroupValue ;
@@ -148,26 +153,60 @@ impl DistinctStateFunc<DataGroupValue> for AggregateDistinctState {
148
153
}
149
154
}
150
155
156
+ impl AggregateDistinctStringState {
157
+ #[ inline]
158
+ fn insert_and_materialize ( & mut self , key : & KeysRef ) {
159
+ let entity = self . set . insert_key ( key, & mut self . inserted ) ;
160
+ if self . inserted {
161
+ let data = unsafe { key. as_slice ( ) } ;
162
+
163
+ let holder = self . holders . last_mut ( ) . unwrap ( ) ;
164
+ if holder. may_resize ( data. len ( ) ) {
165
+ let mut holder = StringColumnBuilder :: with_capacity (
166
+ HOLDER_CAPACITY ,
167
+ HOLDER_BYTES_CAPACITY . max ( data. len ( ) ) ,
168
+ ) ;
169
+ holder. put_slice ( data) ;
170
+ holder. commit_row ( ) ;
171
+ let value = unsafe { holder. index_unchecked ( holder. len ( ) - 1 ) } ;
172
+ entity. set_key ( KeysRef :: create ( value. as_ptr ( ) as usize , value. len ( ) ) ) ;
173
+ self . holders . push ( holder) ;
174
+ } else {
175
+ holder. put_slice ( data) ;
176
+ holder. commit_row ( ) ;
177
+ let value = unsafe { holder. index_unchecked ( holder. len ( ) - 1 ) } ;
178
+ entity. set_key ( KeysRef :: create ( value. as_ptr ( ) as usize , value. len ( ) ) ) ;
179
+ }
180
+ }
181
+ }
182
+ }
183
+
151
184
impl DistinctStateFunc < KeysRef > for AggregateDistinctStringState {
152
185
fn new ( ) -> Self {
153
186
AggregateDistinctStringState {
154
- set : HashSet :: new ( ) ,
155
- holder : StringColumnBuilder :: with_capacity ( 0 , 0 ) ,
187
+ set : CommonHashSet :: create ( ) ,
188
+ inserted : false ,
189
+ holders : vec ! [ StringColumnBuilder :: with_capacity(
190
+ HOLDER_CAPACITY ,
191
+ HOLDER_BYTES_CAPACITY ,
192
+ ) ] ,
156
193
}
157
194
}
158
195
159
196
fn serialize ( & self , writer : & mut BytesMut ) -> Result < ( ) > {
160
- serialize_into_buf ( writer, & self . holder )
197
+ serialize_into_buf ( writer, & self . holders )
161
198
}
162
199
163
200
fn deserialize ( & mut self , reader : & mut & [ u8 ] ) -> Result < ( ) > {
164
- self . holder = deserialize_from_slice ( reader) ?;
165
- self . set = HashSet :: with_capacity ( self . holder . len ( ) ) ;
166
-
167
- for index in 0 ..self . holder . len ( ) {
168
- let data = unsafe { self . holder . index_unchecked ( index) } ;
169
- let key = KeysRef :: create ( data. as_ptr ( ) as usize , data. len ( ) ) ;
170
- self . set . insert ( key) ;
201
+ self . holders = deserialize_from_slice ( reader) ?;
202
+ self . set = CommonHashSet :: with_capacity ( self . holders . iter ( ) . map ( |h| h. len ( ) ) . sum ( ) ) ;
203
+
204
+ for holder in self . holders . iter ( ) {
205
+ for index in 0 ..holder. len ( ) {
206
+ let data = unsafe { holder. index_unchecked ( index) } ;
207
+ let key = KeysRef :: create ( data. as_ptr ( ) as usize , data. len ( ) ) ;
208
+ self . set . insert_key ( & key, & mut self . inserted ) ;
209
+ }
171
210
}
172
211
Ok ( ( ) )
173
212
}
@@ -183,16 +222,8 @@ impl DistinctStateFunc<KeysRef> for AggregateDistinctStringState {
183
222
fn add ( & mut self , columns : & [ Column ] , row : usize ) -> Result < ( ) > {
184
223
let column = StringType :: try_downcast_column ( & columns[ 0 ] ) . unwrap ( ) ;
185
224
let data = unsafe { column. index_unchecked ( row) } ;
186
-
187
- let mut key = KeysRef :: create ( data. as_ptr ( ) as usize , data. len ( ) ) ;
188
-
189
- if !self . set . contains ( & key) {
190
- self . holder . put_slice ( data) ;
191
- self . holder . commit_row ( ) ;
192
- let data = unsafe { self . holder . index_unchecked ( self . holder . len ( ) - 1 ) } ;
193
- key = KeysRef :: create ( data. as_ptr ( ) as usize , data. len ( ) ) ;
194
- self . set . insert ( key) ;
195
- }
225
+ let key = KeysRef :: create ( data. as_ptr ( ) as usize , data. len ( ) ) ;
226
+ self . insert_and_materialize ( & key) ;
196
227
Ok ( ( ) )
197
228
}
198
229
@@ -204,48 +235,60 @@ impl DistinctStateFunc<KeysRef> for AggregateDistinctStringState {
204
235
) -> Result < ( ) > {
205
236
let column = StringType :: try_downcast_column ( & columns[ 0 ] ) . unwrap ( ) ;
206
237
207
- for row in 0 ..input_rows {
208
- match validity {
209
- Some ( v ) => {
238
+ match validity {
239
+ Some ( v ) => {
240
+ for row in 0 ..input_rows {
210
241
if v. get_bit ( row) {
211
242
let data = unsafe { column. index_unchecked ( row) } ;
212
- let mut key = KeysRef :: create ( data. as_ptr ( ) as usize , data. len ( ) ) ;
213
- if !self . set . contains ( & key) {
214
- self . holder . put_slice ( data) ;
215
- self . holder . commit_row ( ) ;
216
-
217
- let data =
218
- unsafe { self . holder . index_unchecked ( self . holder . len ( ) - 1 ) } ;
219
- key = KeysRef :: create ( data. as_ptr ( ) as usize , data. len ( ) ) ;
220
- self . set . insert ( key) ;
221
- }
243
+ let key = KeysRef :: create ( data. as_ptr ( ) as usize , data. len ( ) ) ;
244
+ self . insert_and_materialize ( & key) ;
222
245
}
223
246
}
224
- None => {
247
+ }
248
+ None => {
249
+ for row in 0 ..input_rows {
225
250
let data = unsafe { column. index_unchecked ( row) } ;
226
- let mut key = KeysRef :: create ( data. as_ptr ( ) as usize , data. len ( ) ) ;
227
- if !self . set . contains ( & key) {
228
- self . holder . put_slice ( data) ;
229
- self . holder . commit_row ( ) ;
230
-
231
- let data = unsafe { self . holder . index_unchecked ( self . holder . len ( ) - 1 ) } ;
232
- key = KeysRef :: create ( data. as_ptr ( ) as usize , data. len ( ) ) ;
233
- self . set . insert ( key) ;
234
- }
251
+ let key = KeysRef :: create ( data. as_ptr ( ) as usize , data. len ( ) ) ;
252
+ self . insert_and_materialize ( & key) ;
235
253
}
236
254
}
237
255
}
238
256
Ok ( ( ) )
239
257
}
240
258
241
259
fn merge ( & mut self , rhs : & Self ) -> Result < ( ) > {
242
- self . set . extend ( rhs. set . clone ( ) ) ;
260
+ for value in rhs. set . iter ( ) {
261
+ self . insert_and_materialize ( value. get_key ( ) ) ;
262
+ }
243
263
Ok ( ( ) )
244
264
}
245
265
246
266
fn build_columns ( & mut self , _types : & [ DataType ] ) -> Result < Vec < Column > > {
247
- let c = std:: mem:: replace ( & mut self . holder , StringColumnBuilder :: with_capacity ( 0 , 0 ) ) ;
248
- Ok ( vec ! [ Column :: String ( c. build( ) ) ] )
267
+ if self . holders . len ( ) == 1 {
268
+ let c = std:: mem:: replace (
269
+ & mut self . holders [ 0 ] ,
270
+ StringColumnBuilder :: with_capacity ( 0 , 0 ) ,
271
+ ) ;
272
+ return Ok ( vec ! [ Column :: String ( c. build( ) ) ] ) ;
273
+ }
274
+
275
+ let mut values = Vec :: with_capacity ( self . holders . iter ( ) . map ( |h| h. data . len ( ) ) . sum ( ) ) ;
276
+ let mut offsets = Vec :: with_capacity ( self . holders . iter ( ) . map ( |h| h. len ( ) ) . sum ( ) ) ;
277
+
278
+ let mut last_offset = 0 ;
279
+ offsets. push ( 0 ) ;
280
+ for holder in self . holders . iter_mut ( ) {
281
+ for offset in holder. offsets . iter ( ) {
282
+ last_offset += * offset;
283
+ offsets. push ( last_offset) ;
284
+ }
285
+ values. append ( & mut holder. data ) ;
286
+ }
287
+ let c = StringColumnBuilder {
288
+ data : values,
289
+ offsets,
290
+ } ;
291
+ return Ok ( vec ! [ Column :: String ( c. build( ) ) ] ) ;
249
292
}
250
293
}
251
294
0 commit comments