Skip to content

Commit 7d52ca7

Browse files
committed
for test
1 parent f6ba7e2 commit 7d52ca7

File tree

3 files changed

+18
-0
lines changed

3 files changed

+18
-0
lines changed

src/query/service/src/pipelines/processors/transforms/recluster/recluster_partition_exchange.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::sync::Arc;
16+
use std::time::Instant;
1617

1718
use databend_common_exception::Result;
1819
use databend_common_expression::DataBlock;
@@ -34,6 +35,7 @@ impl ReclusterPartitionExchange {
3435
impl Exchange for ReclusterPartitionExchange {
3536
const NAME: &'static str = "Recluster";
3637
fn partition(&self, mut data_block: DataBlock, n: usize) -> Result<Vec<DataBlock>> {
38+
let start = Instant::now();
3739
let range_ids = data_block
3840
.get_last_column()
3941
.as_number()
@@ -58,6 +60,7 @@ impl Exchange for ReclusterPartitionExchange {
5860
output_data_blocks[target].push((partition_id, block));
5961
}
6062
}
63+
log::info!("Recluster range exchange: {:?}", start.elapsed());
6164

6265
// Union data blocks for each processor.
6366
Ok(output_data_blocks

src/query/service/src/pipelines/processors/transforms/recluster/transform_range_partition_indexer.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use std::any::Any;
1616
use std::collections::VecDeque;
1717
use std::sync::Arc;
18+
use std::time::Instant;
1819

1920
use databend_common_exception::Result;
2021
use databend_common_expression::types::ArgType;
@@ -45,6 +46,8 @@ where T: ArgType
4546
output_data: VecDeque<DataBlock>,
4647
bounds: Vec<T::Scalar>,
4748
max_value: Option<T::Scalar>,
49+
50+
start: Instant,
4851
}
4952

5053
impl<T> TransformRangePartitionIndexer<T>
@@ -65,6 +68,7 @@ where
6568
output_data: VecDeque::new(),
6669
bounds: vec![],
6770
max_value: None,
71+
start: Instant::now(),
6872
})
6973
}
7074
}
@@ -121,6 +125,7 @@ where
121125
.expect("require a ReclusterSampleMeta");
122126
self.input_data = meta.blocks;
123127
self.state.merge_sample::<T>(meta.sample_values)?;
128+
log::info!("Recluster range partition: {:?}", self.start.elapsed());
124129
Ok(Event::Async)
125130
}
126131

src/query/service/src/pipelines/processors/transforms/recluster/transform_recluster_partition.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use std::any::Any;
1616
use std::collections::VecDeque;
1717
use std::sync::Arc;
18+
use std::time::Instant;
1819

1920
use databend_common_exception::Result;
2021
use databend_common_expression::BlockMetaInfoDowncast;
@@ -68,6 +69,9 @@ pub struct TransformReclusterPartition {
6869
partition_data: Vec<PartitionData>,
6970
output_data: VecDeque<DataBlock>,
7071

72+
start: Instant,
73+
cnt: usize,
74+
7175
step: Step,
7276
}
7377

@@ -98,6 +102,8 @@ impl TransformReclusterPartition {
98102
partition_data,
99103
output_data: VecDeque::new(),
100104
step: Step::Consume,
105+
start: Instant::now(),
106+
cnt: 0,
101107
},
102108
)))
103109
}
@@ -133,6 +139,10 @@ impl Processor for TransformReclusterPartition {
133139

134140
if self.input.is_finished() {
135141
if !self.partition_data.is_empty() {
142+
if self.cnt == 0 {
143+
log::info!("Recluster: start flush: {:?}", self.start.elapsed());
144+
}
145+
self.cnt += 1;
136146
self.step = Step::Flush;
137147
return Ok(Event::Sync);
138148
}

0 commit comments

Comments
 (0)