Skip to content

Commit 51676bd

Browse files
committed
fix
1 parent 8841e1d commit 51676bd

File tree

3 files changed

+37
-12
lines changed

3 files changed

+37
-12
lines changed

src/query/functions/src/scalars/hilbert.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -257,17 +257,17 @@ pub fn register(registry: &mut FunctionRegistry) {
257257
///
258258
/// # Example
259259
/// For boundaries [10, 20, 30]:
260-
/// - Values < 10 get partition ID 0
261-
/// - Values >= 10 and < 20 get partition ID 1
262-
/// - Values >= 20 and < 30 get partition ID 2
263-
/// - Values >= 30 get partition ID 3
260+
/// - Values <= 10 get partition ID 0
261+
/// - Values > 10 and <= 20 get partition ID 1
262+
/// - Values > 20 and <= 30 get partition ID 2
263+
/// - Values > 30 get partition ID 3
264264
fn calc_range_partition_id(val: ScalarRef, arr: &Column) -> u64 {
265265
let mut low = 0;
266266
let mut high = arr.len();
267267
while low < high {
268268
let mid = low + ((high - low) / 2);
269269
let bound = unsafe { arr.index_unchecked(mid) };
270-
if val >= bound {
270+
if val > bound {
271271
low = mid + 1;
272272
} else {
273273
high = mid;

src/query/service/src/pipelines/processors/transforms/recluster/recluster_sample_state.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ impl SampleState {
3535
completed_inputs: 0,
3636
values: vec![],
3737
bounds: vec![],
38+
max_value: None,
3839
}),
3940
done: Arc::new(WatchNotify::new()),
4041
})
@@ -56,17 +57,22 @@ impl SampleState {
5657
Ok(())
5758
}
5859

59-
pub fn get_bounds<T>(&self) -> Vec<T::Scalar>
60+
pub fn get_bounds<T>(&self) -> (Vec<T::Scalar>, Option<T::Scalar>)
6061
where
6162
T: ArgType,
6263
T::Scalar: Ord,
6364
{
6465
let inner = self.inner.read().unwrap();
65-
inner
66+
let bounds = inner
6667
.bounds
6768
.iter()
6869
.map(|v| T::to_owned_scalar(T::try_downcast_scalar(&v.as_ref()).unwrap()))
69-
.collect()
70+
.collect();
71+
let max_value = inner
72+
.max_value
73+
.as_ref()
74+
.map(|v| T::to_owned_scalar(T::try_downcast_scalar(&v.as_ref()).unwrap()));
75+
(bounds, max_value)
7076
}
7177
}
7278

@@ -76,6 +82,7 @@ pub struct SampleStateInner {
7682

7783
completed_inputs: usize,
7884
bounds: Vec<Scalar>,
85+
max_value: Option<Scalar>,
7986

8087
values: Vec<(u64, Vec<Scalar>)>,
8188
}
@@ -112,6 +119,9 @@ impl SampleStateInner {
112119
let col = T::upcast_column(T::column_from_vec(data.clone(), &[]));
113120
let indices = compare_columns(vec![col], total_samples)?;
114121

122+
let max_index = indices[total_samples - 1] as usize;
123+
let max_val = data[max_index].clone();
124+
115125
let mut cum_weight = 0.0;
116126
let mut target = step;
117127
let mut bounds = Vec::with_capacity(self.partitions - 1);
@@ -126,7 +136,13 @@ impl SampleStateInner {
126136
if cum_weight >= target {
127137
let data = &data[idx];
128138
if previous_bound.as_ref().is_none_or(|prev| data > prev) {
129-
bounds.push(T::upcast_scalar(data.clone()));
139+
if data == &max_val {
140+
self.max_value = Some(T::upcast_scalar(max_val));
141+
break;
142+
}
143+
144+
let bound = T::upcast_scalar(data.clone());
145+
bounds.push(bound);
130146
target += step;
131147
j += 1;
132148
previous_bound = Some(data.clone());

src/query/service/src/pipelines/processors/transforms/recluster/transform_range_partition_indexer.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ where T: ArgType
4444
input_data: Vec<DataBlock>,
4545
output_data: VecDeque<DataBlock>,
4646
bounds: Vec<T::Scalar>,
47+
max_value: Option<T::Scalar>,
4748
}
4849

4950
impl<T> TransformRangePartitionIndexer<T>
@@ -63,6 +64,7 @@ where
6364
input_data: vec![],
6465
output_data: VecDeque::new(),
6566
bounds: vec![],
67+
max_value: None,
6668
})
6769
}
6870
}
@@ -124,6 +126,7 @@ where
124126

125127
fn process(&mut self) -> Result<()> {
126128
if let Some(mut block) = self.input_data.pop() {
129+
let bound_len = self.bounds.len();
127130
let num_rows = block.num_rows();
128131
let last = block.get_last_column().clone();
129132
block.pop_columns(1);
@@ -132,12 +135,18 @@ where
132135
for index in 0..num_rows {
133136
let val =
134137
T::to_owned_scalar(unsafe { T::index_column_unchecked(&last_col, index) });
138+
if self.max_value.as_ref().is_some_and(|v| val >= *v) {
139+
let range_id = bound_len + 1;
140+
builder.push(range_id as u64);
141+
continue;
142+
}
143+
135144
let mut low = 0;
136-
let mut high = self.bounds.len();
145+
let mut high = bound_len;
137146
while low < high {
138147
let mid = low + ((high - low) / 2);
139148
let bound = unsafe { self.bounds.get_unchecked(mid) }.clone();
140-
if val >= bound {
149+
if val > bound {
141150
low = mid + 1;
142151
} else {
143152
high = mid;
@@ -158,7 +167,7 @@ where
158167
#[async_backtrace::framed]
159168
async fn async_process(&mut self) -> Result<()> {
160169
self.state.done.notified().await;
161-
self.bounds = self.state.get_bounds::<T>();
170+
(self.bounds, self.max_value) = self.state.get_bounds::<T>();
162171
Ok(())
163172
}
164173
}

0 commit comments

Comments
 (0)