Skip to content

Commit 302c3f2

Browse files
committed
add noise for hilbert recluster
1 parent c921fe4 commit 302c3f2

File tree

3 files changed

+104
-4
lines changed

3 files changed

+104
-4
lines changed

src/query/functions/src/scalars/hilbert.rs

Lines changed: 93 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,24 +21,33 @@ use databend_common_expression::types::BinaryType;
2121
use databend_common_expression::types::DataType;
2222
use databend_common_expression::types::GenericType;
2323
use databend_common_expression::types::NullableType;
24+
use databend_common_expression::types::NumberDataType;
2425
use databend_common_expression::types::NumberType;
2526
use databend_common_expression::types::ReturnType;
27+
use databend_common_expression::types::StringType;
2628
use databend_common_expression::types::ValueType;
29+
use databend_common_expression::types::ALL_NUMERICS_TYPES;
30+
use databend_common_expression::vectorize_with_builder_1_arg;
2731
use databend_common_expression::vectorize_with_builder_2_arg;
32+
use databend_common_expression::with_number_mapped_type;
2833
use databend_common_expression::Column;
2934
use databend_common_expression::FixedLengthEncoding;
3035
use databend_common_expression::Function;
3136
use databend_common_expression::FunctionDomain;
3237
use databend_common_expression::FunctionEval;
3338
use databend_common_expression::FunctionFactory;
39+
use databend_common_expression::FunctionProperty;
3440
use databend_common_expression::FunctionRegistry;
3541
use databend_common_expression::FunctionSignature;
3642
use databend_common_expression::ScalarRef;
3743
use databend_common_expression::Value;
44+
use rand::rngs::SmallRng;
45+
use rand::Rng;
46+
use rand::SeedableRng;
3847

3948
/// Registers Hilbert curve related functions with the function registry.
4049
pub fn register(registry: &mut FunctionRegistry) {
41-
// Register the hilbert_range_index function that calculates Hilbert indices for multi-dimensional data
50+
// Register the hilbert_range_index function that calculates Hilbert indices for multidimensional data
4251
let factory = FunctionFactory::Closure(Box::new(|_, args_type: &[DataType]| {
4352
let args_num = args_type.len();
4453
// The function supports 2, 3, 4, or 5 dimensions (each dimension requires 2 arguments)
@@ -97,7 +106,7 @@ pub fn register(registry: &mut FunctionRegistry) {
97106
points.push(key);
98107
}
99108

100-
// Convert the multi-dimensional point to a Hilbert index
109+
// Convert the multidimensional point to a Hilbert index
101110
// This maps the n-dimensional point to a 1-dimensional value
102111
let points = points
103112
.iter()
@@ -153,6 +162,88 @@ pub fn register(registry: &mut FunctionRegistry) {
153162
builder.push(id);
154163
}),
155164
);
165+
166+
// We use true randomness by appending a random u8 value at the end of the binary key.
167+
// This introduces noise to break tie cases in clustering keys that are not uniformly distributed.
168+
// Although this may slightly affect the accuracy of range_bound estimation,
169+
// it ensures that Hilbert index + scatter will no longer suffer from data skew.
170+
// Moreover, since the noise is added at the tail, the original order of the keys is preserved.
171+
registry.properties.insert(
172+
"add_noise".to_string(),
173+
FunctionProperty::default().non_deterministic(),
174+
);
175+
176+
registry.register_passthrough_nullable_1_arg::<StringType, BinaryType, _, _>(
177+
"add_noise",
178+
|_, _| FunctionDomain::Full,
179+
vectorize_with_builder_1_arg::<StringType, BinaryType>(|val, builder, _| {
180+
let mut bytes = val.as_bytes().to_vec();
181+
let mut rng = SmallRng::from_entropy();
182+
bytes.push(rng.gen::<u8>());
183+
builder.put_slice(&bytes);
184+
builder.commit_row();
185+
}),
186+
);
187+
188+
for ty in ALL_NUMERICS_TYPES {
189+
with_number_mapped_type!(|NUM_TYPE| match ty {
190+
NumberDataType::NUM_TYPE => {
191+
registry
192+
.register_passthrough_nullable_1_arg::<NumberType<NUM_TYPE>, BinaryType, _, _>(
193+
"add_noise",
194+
|_, _| FunctionDomain::Full,
195+
vectorize_with_builder_1_arg::<NumberType<NUM_TYPE>, BinaryType>(
196+
|val, builder, _| {
197+
let mut encoded = val.encode().to_vec();
198+
let mut rng = SmallRng::from_entropy();
199+
encoded.push(rng.gen::<u8>());
200+
builder.put_slice(&encoded);
201+
builder.commit_row();
202+
},
203+
),
204+
);
205+
}
206+
})
207+
}
208+
209+
registry.register_passthrough_nullable_2_arg::<StringType, NumberType<u64>, BinaryType, _, _>(
210+
"add_noise",
211+
|_, _, _| FunctionDomain::Full,
212+
vectorize_with_builder_2_arg::<StringType, NumberType<u64>, BinaryType>(
213+
|val, level, builder, _| {
214+
let mut bytes = val.as_bytes().to_vec();
215+
let mut rng = SmallRng::from_entropy();
216+
for _ in 0..level {
217+
bytes.push(rng.gen::<u8>());
218+
}
219+
builder.put_slice(&bytes);
220+
builder.commit_row();
221+
},
222+
),
223+
);
224+
225+
for ty in ALL_NUMERICS_TYPES {
226+
with_number_mapped_type!(|NUM_TYPE| match ty {
227+
NumberDataType::NUM_TYPE => {
228+
registry
229+
.register_passthrough_nullable_2_arg::<NumberType<NUM_TYPE>, NumberType<u64>, BinaryType, _, _>(
230+
"add_noise",
231+
|_, _, _| FunctionDomain::Full,
232+
vectorize_with_builder_2_arg::<NumberType<NUM_TYPE>, NumberType<u64>, BinaryType>(
233+
|val, level, builder, _| {
234+
let mut encoded = val.encode().to_vec();
235+
let mut rng = SmallRng::from_entropy();
236+
for _ in 0..level {
237+
encoded.push(rng.gen::<u8>());
238+
}
239+
builder.put_slice(&encoded);
240+
builder.commit_row();
241+
},
242+
),
243+
);
244+
}
245+
})
246+
}
156247
}
157248

158249
/// Calculates the partition ID for a value based on range boundaries.

src/query/service/src/interpreters/interpreter_table_recluster.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -652,7 +652,7 @@ impl ReclusterTableInterpreter {
652652
"range_bound(1000, {sample_size})({cluster_key_str})"
653653
));
654654

655-
hilbert_keys.push(format!("{table}.{cluster_key_str}, []"));
655+
hilbert_keys.push(format!("{cluster_key_str}, []"));
656656
}
657657
let hilbert_keys_str = hilbert_keys.join(", ");
658658

src/query/sql/src/planner/binder/ddl/table.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ use crate::plans::VacuumTemporaryFilesPlan;
149149
use crate::BindContext;
150150
use crate::DefaultExprBinder;
151151
use crate::Planner;
152+
use crate::ScalarExpr;
152153
use crate::SelectBuilder;
153154

154155
pub(in crate::planner::binder) struct AnalyzeCreateTableResult {
@@ -1767,14 +1768,22 @@ impl Binder {
17671768

17681769
let mut cluster_keys = Vec::with_capacity(expr_len);
17691770
for cluster_expr in cluster_exprs.iter() {
1770-
let (cluster_key, _) = scalar_binder.bind(cluster_expr)?;
1771+
let (mut cluster_key, _) = scalar_binder.bind(cluster_expr)?;
17711772
if cluster_key.used_columns().len() != 1 || !cluster_key.evaluable() {
17721773
return Err(ErrorCode::InvalidClusterKeys(format!(
17731774
"Cluster by expression `{:#}` is invalid",
17741775
cluster_expr
17751776
)));
17761777
}
17771778

1779+
if let ScalarExpr::FunctionCall(func) = &cluster_key {
1780+
if func.func_name == "add_noise" && matches!(cluster_type, AstClusterType::Hilbert)
1781+
{
1782+
debug_assert!(func.arguments.len() == 1);
1783+
cluster_key = func.arguments[0].clone();
1784+
}
1785+
}
1786+
17781787
let expr = cluster_key.as_expr()?;
17791788
if !expr.is_deterministic(&BUILTIN_FUNCTIONS) {
17801789
return Err(ErrorCode::InvalidClusterKeys(format!(

0 commit comments

Comments
 (0)