Skip to content

Commit 841e4cf

Browse files
committed
update
1 parent 302c3f2 commit 841e4cf

File tree

10 files changed

+759
-5
lines changed

10 files changed

+759
-5
lines changed

src/common/base/src/base/watch_notify.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ impl WatchNotify {
4444
pub fn notify_waiters(&self) {
4545
let _ = self.tx.send_replace(true);
4646
}
47+
48+
pub fn is_notified(&self) -> bool {
49+
*self.rx.borrow()
50+
}
4751
}
4852

4953
#[cfg(test)]

src/query/functions/src/aggregates/aggregate_range_bound.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -326,9 +326,7 @@ pub fn try_create_aggregate_range_bound_function(
326326
/// For a column with values `(0, 1, 3, 6, 8)` and `partition_num = 3`, the function calculates the
327327
/// partition boundaries based on the distribution of the data. The boundaries might be `[1, 6]`.
328328
pub fn aggregate_range_bound_function_desc() -> AggregateFunctionDescription {
329-
AggregateFunctionDescription::creator(Box::new(
330-
crate::aggregates::try_create_aggregate_range_bound_function,
331-
))
329+
AggregateFunctionDescription::creator(Box::new(try_create_aggregate_range_bound_function))
332330
}
333331

334332
fn get_partitions(
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use databend_common_exception::Result;
18+
use databend_common_expression::row::RowConverter as CommonConverter;
19+
use databend_common_expression::types::AccessType;
20+
use databend_common_expression::types::ArgType;
21+
use databend_common_expression::types::DataType;
22+
use databend_common_expression::types::DateType;
23+
use databend_common_expression::types::NumberDataType;
24+
use databend_common_expression::types::NumberType;
25+
use databend_common_expression::types::StringType;
26+
use databend_common_expression::types::TimestampType;
27+
use databend_common_expression::with_number_mapped_type;
28+
use databend_common_expression::DataSchemaRef;
29+
use databend_common_expression::SortColumnDescription;
30+
use databend_common_pipeline_core::processors::InputPort;
31+
use databend_common_pipeline_core::processors::OutputPort;
32+
use databend_common_pipeline_core::processors::Processor;
33+
use databend_common_pipeline_transforms::sort::CommonRows;
34+
use databend_common_pipeline_transforms::sort::RowConverter;
35+
use databend_common_pipeline_transforms::sort::Rows;
36+
use databend_common_pipeline_transforms::sort::SimpleRowConverter;
37+
use databend_common_pipeline_transforms::sort::SimpleRowsAsc;
38+
use databend_common_pipeline_transforms::AccumulatingTransformer;
39+
use databend_common_pipeline_transforms::Transformer;
40+
use match_template::match_template;
41+
42+
use crate::pipelines::processors::transforms::recluster::transform_add_order_column::TransformAddOrderColumn;
43+
use crate::pipelines::processors::transforms::recluster::TransformRangePartitionIndexer;
44+
use crate::pipelines::processors::transforms::SampleState;
45+
use crate::pipelines::processors::transforms::TransformReclusterCollect;
46+
47+
pub struct TransformReclusterBuilder {
48+
schema: DataSchemaRef,
49+
sort_desc: Arc<[SortColumnDescription]>,
50+
sample_rate: f64,
51+
seed: u64,
52+
}
53+
54+
impl TransformReclusterBuilder {
55+
pub fn build_recluster_sample(
56+
&self,
57+
input: Arc<InputPort>,
58+
output: Arc<OutputPort>,
59+
) -> Result<Box<dyn Processor>> {
60+
self.build_inner(BuilderType::ReclusterSample, input, output, None)
61+
}
62+
63+
pub fn build_range_partition_indexer(
64+
&self,
65+
input: Arc<InputPort>,
66+
output: Arc<OutputPort>,
67+
state: Arc<SampleState>,
68+
) -> Result<Box<dyn Processor>> {
69+
self.build_inner(
70+
BuilderType::RangePartitionIndexer,
71+
input,
72+
output,
73+
Some(state),
74+
)
75+
}
76+
77+
pub fn build_add_order_column(
78+
&self,
79+
input: Arc<InputPort>,
80+
output: Arc<OutputPort>,
81+
) -> Result<Box<dyn Processor>> {
82+
self.build_inner(BuilderType::AddOrderColumn, input, output, None)
83+
}
84+
85+
fn build_inner(
86+
&self,
87+
typ: BuilderType,
88+
input: Arc<InputPort>,
89+
output: Arc<OutputPort>,
90+
state: Option<Arc<SampleState>>,
91+
) -> Result<Box<dyn Processor>> {
92+
let mut build = BuilderInner {
93+
input,
94+
output,
95+
typ,
96+
base: self,
97+
state,
98+
};
99+
build.select_row_type()
100+
}
101+
}
102+
103+
enum BuilderType {
104+
AddOrderColumn,
105+
ReclusterSample,
106+
RangePartitionIndexer,
107+
}
108+
109+
struct BuilderInner<'a> {
110+
input: Arc<InputPort>,
111+
output: Arc<OutputPort>,
112+
typ: BuilderType,
113+
base: &'a TransformReclusterBuilder,
114+
state: Option<Arc<SampleState>>,
115+
}
116+
117+
impl BuilderInner<'_> {
118+
pub fn select_row_type(&mut self) -> Result<Box<dyn Processor>> {
119+
match self.base.sort_desc.as_ref() {
120+
[desc] => {
121+
let schema = self.base.schema.clone();
122+
let sort_type = schema.field(desc.offset).data_type();
123+
assert!(desc.asc);
124+
125+
match_template! {
126+
T = [ Date => DateType, Timestamp => TimestampType, String => StringType ],
127+
match sort_type {
128+
DataType::T => {
129+
self.visit_type::<SimpleRowsAsc<T>, SimpleRowConverter<T>>()
130+
},
131+
DataType::Number(num_ty) => with_number_mapped_type!(|NUM_TYPE| match num_ty {
132+
NumberDataType::NUM_TYPE => {
133+
self.visit_type::<SimpleRowsAsc<NumberType<NUM_TYPE>>, SimpleRowConverter<NumberType<NUM_TYPE>>>()
134+
}
135+
}),
136+
_ => self.visit_type::<CommonRows, CommonConverter>()
137+
}
138+
}
139+
}
140+
_ => self.visit_type::<CommonRows, CommonConverter>(),
141+
}
142+
}
143+
144+
fn visit_type<R, C>(&mut self) -> Result<Box<dyn Processor>>
145+
where
146+
R: Rows + 'static,
147+
C: RowConverter<R> + Send + 'static,
148+
R::Type: ArgType + Send + Sync,
149+
<R::Type as AccessType>::Scalar: Ord + Send + Sync,
150+
{
151+
match self.typ {
152+
BuilderType::AddOrderColumn => self.build_add_order_column::<R, C>(),
153+
BuilderType::ReclusterSample => self.build_recluster_sample::<R::Type>(),
154+
BuilderType::RangePartitionIndexer => self.build_range_partition_indexer::<R::Type>(),
155+
}
156+
}
157+
158+
fn build_add_order_column<R, C>(&mut self) -> Result<Box<dyn Processor>>
159+
where
160+
R: Rows + 'static,
161+
C: RowConverter<R> + Send + 'static,
162+
{
163+
let inner = TransformAddOrderColumn::<R, C>::try_new(
164+
self.base.sort_desc.clone(),
165+
self.base.schema.clone(),
166+
)?;
167+
Ok(Transformer::create(
168+
self.input.clone(),
169+
self.output.clone(),
170+
inner,
171+
))
172+
}
173+
174+
fn build_range_partition_indexer<T>(&mut self) -> Result<Box<dyn Processor>>
175+
where
176+
T: ArgType + Send + Sync,
177+
T::Scalar: Ord + Send + Sync,
178+
{
179+
Ok(TransformRangePartitionIndexer::<T>::create(
180+
self.input.clone(),
181+
self.output.clone(),
182+
self.state.clone().unwrap(),
183+
))
184+
}
185+
186+
fn build_recluster_sample<T>(&mut self) -> Result<Box<dyn Processor>>
187+
where
188+
T: ArgType + Send + Sync,
189+
T::Scalar: Ord + Send + Sync,
190+
{
191+
let offset = self.base.schema.fields().len();
192+
Ok(AccumulatingTransformer::create(
193+
self.input.clone(),
194+
self.output.clone(),
195+
TransformReclusterCollect::<T>::new(offset, self.base.sample_rate, self.base.seed),
196+
))
197+
}
198+
}

src/query/service/src/pipelines/processors/transforms/recluster/mod.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,20 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
mod builder;
16+
mod range_bound_sampler;
1517
mod recluster_partition_exchange;
1618
mod recluster_partition_strategy;
19+
mod recluster_sample_state;
20+
mod transform_add_order_column;
21+
mod transform_range_partition_indexer;
22+
mod transform_recluster_collect;
1723

24+
pub use range_bound_sampler::RangeBoundSampler;
1825
pub use recluster_partition_exchange::ReclusterPartitionExchange;
1926
pub use recluster_partition_strategy::CompactPartitionStrategy;
2027
pub use recluster_partition_strategy::ReclusterPartitionStrategy;
28+
pub(crate) use recluster_sample_state::SampleState;
29+
pub use transform_range_partition_indexer::TransformRangePartitionIndexer;
30+
pub(crate) use transform_recluster_collect::ReclusterSampleMeta;
31+
pub use transform_recluster_collect::TransformReclusterCollect;
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::marker::PhantomData;
16+
17+
use databend_common_expression::types::ArgType;
18+
use databend_common_expression::types::ValueType;
19+
use databend_common_expression::DataBlock;
20+
use databend_common_expression::Scalar;
21+
use rand::prelude::SliceRandom;
22+
use rand::prelude::SmallRng;
23+
use rand::SeedableRng;
24+
25+
pub struct RangeBoundSampler<T>
26+
where T: ValueType
27+
{
28+
offset: usize,
29+
sample_rate: f64,
30+
rng: SmallRng,
31+
32+
values: Vec<(u64, Vec<Scalar>)>,
33+
_t: PhantomData<T>,
34+
}
35+
36+
impl<T> RangeBoundSampler<T>
37+
where T: ValueType
38+
{
39+
pub fn new(offset: usize, sample_rate: f64, seed: u64) -> Self {
40+
let rng = SmallRng::seed_from_u64(seed);
41+
Self {
42+
offset,
43+
sample_rate,
44+
rng,
45+
values: vec![],
46+
_t: PhantomData,
47+
}
48+
}
49+
}
50+
51+
impl<T> RangeBoundSampler<T>
52+
where
53+
T: ArgType,
54+
T::Scalar: Ord + Send,
55+
{
56+
pub fn add_block(&mut self, data: &DataBlock) {
57+
let rows = data.num_rows();
58+
assert!(rows > 0);
59+
let column = data.get_by_offset(self.offset).to_column(rows);
60+
61+
let sample_size = std::cmp::max((self.sample_rate * rows as f64).ceil() as usize, 100);
62+
let mut indices = (0..rows).collect::<Vec<_>>();
63+
64+
let sampled_indices = if rows > sample_size {
65+
indices.shuffle(&mut self.rng);
66+
&indices[..sample_size]
67+
} else {
68+
&indices
69+
};
70+
71+
let column = T::try_downcast_column(&column).unwrap();
72+
let sample_values = sampled_indices
73+
.iter()
74+
.map(|i| {
75+
T::upcast_scalar(T::to_owned_scalar(unsafe {
76+
T::index_column_unchecked(&column, *i)
77+
}))
78+
})
79+
.collect::<Vec<_>>();
80+
self.values.push((rows as u64, sample_values));
81+
}
82+
83+
pub fn sample_values(&mut self) -> Vec<(u64, Vec<Scalar>)> {
84+
std::mem::take(&mut self.values)
85+
}
86+
}

src/query/service/src/pipelines/processors/transforms/recluster/recluster_partition_exchange.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,7 @@ impl ReclusterPartitionExchange {
3333

3434
impl Exchange for ReclusterPartitionExchange {
3535
const NAME: &'static str = "Recluster";
36-
fn partition(&self, data_block: DataBlock, n: usize) -> Result<Vec<DataBlock>> {
37-
let mut data_block = data_block;
36+
fn partition(&self, mut data_block: DataBlock, n: usize) -> Result<Vec<DataBlock>> {
3837
let range_ids = data_block
3938
.get_last_column()
4039
.as_number()

0 commit comments

Comments
 (0)