Skip to content

Commit 6f6df79

Browse files
authored
chore: fix some flood logs with batch progress (#15492)
1 parent 916e70e commit 6f6df79

File tree

2 files changed

+25
-7
lines changed

2 files changed

+25
-7
lines changed

src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use std::any::Any;
1616
use std::collections::VecDeque;
1717
use std::sync::Arc;
18+
use std::time::Duration;
1819
use std::time::Instant;
1920

2021
use databend_common_base::runtime::profile::Profile;
@@ -199,6 +200,11 @@ impl<Method: HashMethodBounds, V: Send + Sync + 'static> Processor
199200
self.deserializing_meta = Some((block_meta, VecDeque::from(vec![data])));
200201
}
201202
AggregateMeta::Partitioned { data, .. } => {
203+
// For log progress.
204+
let mut total_elapsed = Duration::default();
205+
let log_interval = 100;
206+
let mut processed_count = 0;
207+
202208
let mut read_data = Vec::with_capacity(data.len());
203209
for meta in data {
204210
if let AggregateMeta::BucketSpilled(payload) = meta {
@@ -229,11 +235,18 @@ impl<Method: HashMethodBounds, V: Send + Sync + 'static> Processor
229235
);
230236
}
231237

232-
info!(
233-
"Read aggregate spill {} successfully, elapsed: {:?}",
234-
location,
235-
instant.elapsed()
236-
);
238+
total_elapsed += instant.elapsed();
239+
processed_count += 1;
240+
241+
// log the progress
242+
if processed_count % log_interval == 0 {
243+
info!(
244+
"Read aggregate {}/{} spilled buckets, elapsed: {:?}",
245+
processed_count,
246+
data.len(),
247+
total_elapsed
248+
);
249+
}
237250

238251
Ok(data)
239252
}));
@@ -251,6 +264,11 @@ impl<Method: HashMethodBounds, V: Send + Sync + 'static> Processor
251264
self.deserializing_meta = Some((block_meta, read_data?));
252265
}
253266
};
267+
268+
info!(
269+
"Read {} aggregate spills successfully, total elapsed: {:?}",
270+
processed_count, total_elapsed
271+
);
254272
}
255273
}
256274
}

src/query/sql/src/evaluator/cse.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
use std::collections::HashMap;
1616

1717
use databend_common_expression::Expr;
18-
use log::info;
18+
use log::debug;
1919

2020
use super::BlockOperator;
2121
use crate::optimizer::ColumnSet;
@@ -63,7 +63,7 @@ pub fn apply_cse(
6363
let mut expr_cloned = cse_candidate.clone();
6464
perform_cse_replacement(&mut expr_cloned, &cse_replacements);
6565

66-
info!(
66+
debug!(
6767
"cse_candidate: {}, temp_expr: {}",
6868
expr_cloned.sql_display(),
6969
temp_expr.sql_display()

0 commit comments

Comments
 (0)