@@ -130,42 +130,46 @@ where
130
130
}
131
131
132
132
fn process ( & mut self ) -> Result < ( ) > {
133
- if let Some ( mut block) = self . input_data . pop ( ) {
134
- let bound_len = self . bounds . len ( ) ;
135
- let num_rows = block. num_rows ( ) ;
136
- let last = block. get_last_column ( ) . clone ( ) ;
137
- block. pop_columns ( 1 ) ;
138
- let mut builder = Vec :: with_capacity ( num_rows) ;
139
- let last_col = T :: try_downcast_column ( & last. remove_nullable ( ) ) . unwrap ( ) ;
140
- for index in 0 ..num_rows {
141
- let val =
142
- T :: to_owned_scalar ( unsafe { T :: index_column_unchecked ( & last_col, index) } ) ;
143
- if self . max_value . as_ref ( ) . is_some_and ( |v| val >= * v) {
144
- let range_id = bound_len + 1 ;
145
- builder. push ( range_id as u64 ) ;
146
- continue ;
147
- }
133
+ let start = Instant :: now ( ) ;
134
+ let mut block = {
135
+ let blocks = std:: mem:: take ( & mut self . input_data ) ;
136
+ DataBlock :: concat ( & blocks) ?
137
+ } ;
138
+
139
+ let bound_len = self . bounds . len ( ) ;
140
+ let num_rows = block. num_rows ( ) ;
141
+ let last = block. get_last_column ( ) . clone ( ) ;
142
+ block. pop_columns ( 1 ) ;
143
+ let mut builder = Vec :: with_capacity ( num_rows) ;
144
+ let last_col = T :: try_downcast_column ( & last. remove_nullable ( ) ) . unwrap ( ) ;
145
+ for index in 0 ..num_rows {
146
+ let val = T :: to_owned_scalar ( unsafe { T :: index_column_unchecked ( & last_col, index) } ) ;
147
+ if self . max_value . as_ref ( ) . is_some_and ( |v| val >= * v) {
148
+ let range_id = bound_len + 1 ;
149
+ builder. push ( range_id as u64 ) ;
150
+ continue ;
151
+ }
148
152
149
- let mut low = 0 ;
150
- let mut high = bound_len;
151
- while low < high {
152
- let mid = low + ( ( high - low) / 2 ) ;
153
- let bound = unsafe { self . bounds . get_unchecked ( mid) } . clone ( ) ;
154
- if val > bound {
155
- low = mid + 1 ;
156
- } else {
157
- high = mid;
158
- }
153
+ let mut low = 0 ;
154
+ let mut high = bound_len;
155
+ while low < high {
156
+ let mid = low + ( ( high - low) / 2 ) ;
157
+ let bound = unsafe { self . bounds . get_unchecked ( mid) } . clone ( ) ;
158
+ if val > bound {
159
+ low = mid + 1 ;
160
+ } else {
161
+ high = mid;
159
162
}
160
- builder. push ( low as u64 ) ;
161
163
}
162
-
163
- block. add_column ( BlockEntry :: new (
164
- DataType :: Number ( NumberDataType :: UInt64 ) ,
165
- Value :: Column ( UInt64Type :: from_data ( builder) ) ,
166
- ) ) ;
167
- self . output_data . push_back ( block) ;
164
+ builder. push ( low as u64 ) ;
168
165
}
166
+
167
+ block. add_column ( BlockEntry :: new (
168
+ DataType :: Number ( NumberDataType :: UInt64 ) ,
169
+ Value :: Column ( UInt64Type :: from_data ( builder) ) ,
170
+ ) ) ;
171
+ self . output_data . push_back ( block) ;
172
+ log:: info!( "Recluster range output: {:?}" , start. elapsed( ) ) ;
169
173
Ok ( ( ) )
170
174
}
171
175
0 commit comments