Skip to content

Commit eb671da

Browse files
authored
fix: prevent log table from logging its own logs (#17802)
* fix: prevent log table from logging its own logs * add test * docs: add comments for default stage name * chore: refine more comments and add a config to control retention frequency * chore: refine more comments and add a config to control retention frequency * chore: refine more comments and add a config to control retention frequency * chore: refine more comments and add a config to control retention frequency * update config file * handle cluster mode * Revert "handle cluster mode" This reverts commit 69cacfd. * handle cluster mode * rebase
1 parent 7f7c735 commit eb671da

File tree

9 files changed

+147
-38
lines changed

9 files changed

+147
-38
lines changed

src/common/base/src/runtime/runtime_tracker.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ pub struct TrackingPayload {
103103
pub profile: Option<Arc<Profile>>,
104104
pub mem_stat: Option<Arc<MemStat>>,
105105
pub metrics: Option<Arc<ScopedRegistry>>,
106+
pub should_log: bool,
106107
}
107108

108109
pub struct TrackingGuard {
@@ -163,6 +164,7 @@ impl ThreadTracker {
163164
metrics: None,
164165
mem_stat: None,
165166
query_id: None,
167+
should_log: true,
166168
},
167169
}
168170
}
@@ -243,6 +245,10 @@ impl ThreadTracker {
243245
.map(|query_id| unsafe { &*(query_id as *const String) })
244246
})
245247
}
248+
249+
pub fn should_log() -> bool {
250+
TRACKER.with(|tracker| tracker.borrow().payload.should_log)
251+
}
246252
}
247253

248254
pin_project! {

src/common/tracing/src/config.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -342,14 +342,20 @@ pub struct PersistentLogConfig {
342342
pub stage_name: String,
343343
pub level: String,
344344
pub retention: usize,
345+
pub retention_interval: usize,
345346
}
346347

347348
impl Display for PersistentLogConfig {
348349
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
349350
write!(
350351
f,
351-
"enabled={}, interval={}, stage_name={}, level={}, retention={}",
352-
self.on, self.interval, self.stage_name, self.level, self.retention
352+
"enabled={}, interval={}, stage_name={}, level={}, retention={}, retention_interval={}",
353+
self.on,
354+
self.interval,
355+
self.stage_name,
356+
self.level,
357+
self.retention,
358+
self.retention_interval
353359
)
354360
}
355361
}
@@ -359,9 +365,13 @@ impl Default for PersistentLogConfig {
359365
Self {
360366
on: false,
361367
interval: 2,
368+
// The default value of stage name uses an uuid to avoid conflicts with existing stages
362369
stage_name: "log_1f93b76af0bd4b1d8e018667865fbc65".to_string(),
363-
retention: 72,
364370
level: "WARN".to_string(),
371+
// Data older than 72 hours will be deleted during retention tasks
372+
retention: 72,
373+
// Trigger the retention task every 24 hours
374+
retention_interval: 24,
365375
}
366376
}
367377
}

src/common/tracing/src/filter.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use databend_common_base::runtime::ThreadTracker;
16+
use logforth::filter::CustomFilter;
17+
use logforth::filter::FilterResult;
18+
19+
pub fn filter_by_thread_tracker() -> CustomFilter {
20+
CustomFilter::new(|_metadata| match ThreadTracker::should_log() {
21+
true => FilterResult::Neutral,
22+
false => FilterResult::Reject,
23+
})
24+
}

src/common/tracing/src/init.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use opendal::Operator;
3131
use opentelemetry_otlp::WithExportConfig;
3232

3333
use crate::config::OTLPProtocol;
34+
use crate::filter::filter_by_thread_tracker;
3435
use crate::loggers::get_layout;
3536
use crate::loggers::new_rolling_file_appender;
3637
use crate::remote_log::RemoteLog;
@@ -219,6 +220,7 @@ pub fn init_logging(
219220

220221
let dispatch = Dispatch::new()
221222
.filter(env_filter(&cfg.file.level))
223+
.filter(filter_by_thread_tracker())
222224
.append(normal_log_file.with_layout(get_layout(&cfg.file.format)));
223225
logger = logger.dispatch(dispatch);
224226
}
@@ -227,6 +229,7 @@ pub fn init_logging(
227229
if cfg.stderr.on {
228230
let dispatch = Dispatch::new()
229231
.filter(env_filter(&cfg.stderr.level))
232+
.filter(filter_by_thread_tracker())
230233
.append(
231234
logforth::append::Stderr::default().with_layout(get_layout(&cfg.stderr.format)),
232235
);
@@ -253,6 +256,7 @@ pub fn init_logging(
253256
.expect("initialize opentelemetry logger");
254257
let dispatch = Dispatch::new()
255258
.filter(env_filter(&cfg.otlp.level))
259+
.filter(filter_by_thread_tracker())
256260
.append(otel);
257261
logger = logger.dispatch(dispatch);
258262
}
@@ -261,6 +265,7 @@ pub fn init_logging(
261265
if cfg.tracing.on || cfg.structlog.on {
262266
let dispatch = Dispatch::new()
263267
.filter(env_filter(&cfg.tracing.capture_log_level))
268+
.filter(filter_by_thread_tracker())
264269
.append(logforth::append::FastraceEvent::default());
265270
logger = logger.dispatch(dispatch);
266271
}
@@ -281,6 +286,7 @@ pub fn init_logging(
281286
EnvFilterBuilder::new()
282287
.filter(Some("databend::log::query"), LevelFilter::Trace),
283288
))
289+
.filter(filter_by_thread_tracker())
284290
.append(query_log_file.with_layout(get_layout("identical")));
285291
logger = logger.dispatch(dispatch);
286292
}
@@ -306,6 +312,7 @@ pub fn init_logging(
306312
EnvFilterBuilder::new()
307313
.filter(Some("databend::log::query"), LevelFilter::Trace),
308314
))
315+
.filter(filter_by_thread_tracker())
309316
.append(otel);
310317
logger = logger.dispatch(dispatch);
311318
}
@@ -327,6 +334,7 @@ pub fn init_logging(
327334
EnvFilterBuilder::new()
328335
.filter(Some("databend::log::profile"), LevelFilter::Trace),
329336
))
337+
.filter(filter_by_thread_tracker())
330338
.append(profile_log_file.with_layout(get_layout("identical")));
331339
logger = logger.dispatch(dispatch);
332340
}
@@ -352,6 +360,7 @@ pub fn init_logging(
352360
EnvFilterBuilder::new()
353361
.filter(Some("databend::log::profile"), LevelFilter::Trace),
354362
))
363+
.filter(filter_by_thread_tracker())
355364
.append(otel);
356365
logger = logger.dispatch(dispatch);
357366
}
@@ -372,6 +381,7 @@ pub fn init_logging(
372381
EnvFilterBuilder::new()
373382
.filter(Some("databend::log::structlog"), LevelFilter::Trace),
374383
))
384+
.filter(filter_by_thread_tracker())
375385
.append(structlog_log_file);
376386
logger = logger.dispatch(dispatch);
377387
}
@@ -400,6 +410,7 @@ pub fn init_logging(
400410
.filter(EnvFilter::new(
401411
filter_builder.parse(&cfg.persistentlog.level),
402412
))
413+
.filter(filter_by_thread_tracker())
403414
.append(remote_log);
404415

405416
logger = logger.dispatch(dispatch);

src/common/tracing/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
mod config;
2121
mod crash_hook;
22+
mod filter;
2223
mod init;
2324
mod loggers;
2425
mod panic_hook;

src/query/config/src/config.rs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2556,6 +2556,9 @@ pub struct PersistentLogConfig {
25562556
pub log_persistentlog_interval: usize,
25572557

25582558
/// Specifies the name of the staging area that temporarily holds log data before it is finally copied into the table
2559+
///
2560+
/// Note:
2561+
/// The default value uses an uuid to avoid conflicts with existing stages
25592562
#[clap(
25602563
long = "log-persistentlog-stage-name",
25612564
value_name = "VALUE",
@@ -2564,7 +2567,17 @@ pub struct PersistentLogConfig {
25642567
#[serde(rename = "stage_name")]
25652568
pub log_persistentlog_stage_name: String,
25662569

2567-
/// Specifies how long the persistent log should be retained, in hours
2570+
/// Log level <DEBUG|INFO|WARN|ERROR>
2571+
#[clap(
2572+
long = "log-persistentlog-level",
2573+
value_name = "VALUE",
2574+
default_value = "WARN"
2575+
)]
2576+
#[serde(rename = "level")]
2577+
pub log_persistentlog_level: String,
2578+
2579+
/// The retention period (in hours) for persistent logs.
2580+
/// Data older than this period will be deleted during retention tasks.
25682581
#[clap(
25692582
long = "log-persistentlog-retention",
25702583
value_name = "VALUE",
@@ -2573,14 +2586,15 @@ pub struct PersistentLogConfig {
25732586
#[serde(rename = "retention")]
25742587
pub log_persistentlog_retention: usize,
25752588

2576-
/// Log level <DEBUG|INFO|WARN|ERROR>
2589+
/// The interval (in hours) at which the retention process is triggered.
2590+
/// Specifies how often the retention task runs to clean up old data.
25772591
#[clap(
2578-
long = "log-persistentlog-level",
2592+
long = "log-persistentlog-retention-interval",
25792593
value_name = "VALUE",
2580-
default_value = "WARN"
2594+
default_value = "24"
25812595
)]
2582-
#[serde(rename = "level")]
2583-
pub log_persistentlog_level: String,
2596+
#[serde(rename = "retention_interval")]
2597+
pub log_persistentlog_retention_interval: usize,
25842598
}
25852599

25862600
impl Default for PersistentLogConfig {
@@ -2599,6 +2613,7 @@ impl TryInto<InnerPersistentLogConfig> for PersistentLogConfig {
25992613
stage_name: self.log_persistentlog_stage_name,
26002614
level: self.log_persistentlog_level,
26012615
retention: self.log_persistentlog_retention,
2616+
retention_interval: self.log_persistentlog_retention_interval,
26022617
})
26032618
}
26042619
}
@@ -2611,6 +2626,7 @@ impl From<InnerPersistentLogConfig> for PersistentLogConfig {
26112626
log_persistentlog_stage_name: inner.stage_name,
26122627
log_persistentlog_level: inner.level,
26132628
log_persistentlog_retention: inner.retention,
2629+
log_persistentlog_retention_interval: inner.retention_interval,
26142630
}
26152631
}
26162632
}

0 commit comments

Comments
 (0)