Skip to content

Commit 5e0ae93

Browse files
committed
Merge remote-tracking branch 'up/main' into range-shuffle
2 parents 2c82187 + eb671da commit 5e0ae93

File tree

17 files changed

+439
-344
lines changed

17 files changed

+439
-344
lines changed

src/common/base/src/runtime/runtime_tracker.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ pub struct TrackingPayload {
103103
pub profile: Option<Arc<Profile>>,
104104
pub mem_stat: Option<Arc<MemStat>>,
105105
pub metrics: Option<Arc<ScopedRegistry>>,
106+
pub should_log: bool,
106107
}
107108

108109
pub struct TrackingGuard {
@@ -163,6 +164,7 @@ impl ThreadTracker {
163164
metrics: None,
164165
mem_stat: None,
165166
query_id: None,
167+
should_log: true,
166168
},
167169
}
168170
}
@@ -243,6 +245,10 @@ impl ThreadTracker {
243245
.map(|query_id| unsafe { &*(query_id as *const String) })
244246
})
245247
}
248+
249+
pub fn should_log() -> bool {
250+
TRACKER.with(|tracker| tracker.borrow().payload.should_log)
251+
}
246252
}
247253

248254
pin_project! {

src/common/tracing/src/config.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -342,14 +342,20 @@ pub struct PersistentLogConfig {
342342
pub stage_name: String,
343343
pub level: String,
344344
pub retention: usize,
345+
pub retention_interval: usize,
345346
}
346347

347348
impl Display for PersistentLogConfig {
348349
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
349350
write!(
350351
f,
351-
"enabled={}, interval={}, stage_name={}, level={}, retention={}",
352-
self.on, self.interval, self.stage_name, self.level, self.retention
352+
"enabled={}, interval={}, stage_name={}, level={}, retention={}, retention_interval={}",
353+
self.on,
354+
self.interval,
355+
self.stage_name,
356+
self.level,
357+
self.retention,
358+
self.retention_interval
353359
)
354360
}
355361
}
@@ -359,9 +365,13 @@ impl Default for PersistentLogConfig {
359365
Self {
360366
on: false,
361367
interval: 2,
368+
// The default value of stage name uses an uuid to avoid conflicts with existing stages
362369
stage_name: "log_1f93b76af0bd4b1d8e018667865fbc65".to_string(),
363-
retention: 72,
364370
level: "WARN".to_string(),
371+
// Data older than 72 hours will be deleted during retention tasks
372+
retention: 72,
373+
// Trigger the retention task every 24 hours
374+
retention_interval: 24,
365375
}
366376
}
367377
}

src/common/tracing/src/filter.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use databend_common_base::runtime::ThreadTracker;
16+
use logforth::filter::CustomFilter;
17+
use logforth::filter::FilterResult;
18+
19+
pub fn filter_by_thread_tracker() -> CustomFilter {
20+
CustomFilter::new(|_metadata| match ThreadTracker::should_log() {
21+
true => FilterResult::Neutral,
22+
false => FilterResult::Reject,
23+
})
24+
}

src/common/tracing/src/init.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use opendal::Operator;
3131
use opentelemetry_otlp::WithExportConfig;
3232

3333
use crate::config::OTLPProtocol;
34+
use crate::filter::filter_by_thread_tracker;
3435
use crate::loggers::get_layout;
3536
use crate::loggers::new_rolling_file_appender;
3637
use crate::remote_log::RemoteLog;
@@ -219,6 +220,7 @@ pub fn init_logging(
219220

220221
let dispatch = Dispatch::new()
221222
.filter(env_filter(&cfg.file.level))
223+
.filter(filter_by_thread_tracker())
222224
.append(normal_log_file.with_layout(get_layout(&cfg.file.format)));
223225
logger = logger.dispatch(dispatch);
224226
}
@@ -227,6 +229,7 @@ pub fn init_logging(
227229
if cfg.stderr.on {
228230
let dispatch = Dispatch::new()
229231
.filter(env_filter(&cfg.stderr.level))
232+
.filter(filter_by_thread_tracker())
230233
.append(
231234
logforth::append::Stderr::default().with_layout(get_layout(&cfg.stderr.format)),
232235
);
@@ -253,6 +256,7 @@ pub fn init_logging(
253256
.expect("initialize opentelemetry logger");
254257
let dispatch = Dispatch::new()
255258
.filter(env_filter(&cfg.otlp.level))
259+
.filter(filter_by_thread_tracker())
256260
.append(otel);
257261
logger = logger.dispatch(dispatch);
258262
}
@@ -261,6 +265,7 @@ pub fn init_logging(
261265
if cfg.tracing.on || cfg.structlog.on {
262266
let dispatch = Dispatch::new()
263267
.filter(env_filter(&cfg.tracing.capture_log_level))
268+
.filter(filter_by_thread_tracker())
264269
.append(logforth::append::FastraceEvent::default());
265270
logger = logger.dispatch(dispatch);
266271
}
@@ -281,6 +286,7 @@ pub fn init_logging(
281286
EnvFilterBuilder::new()
282287
.filter(Some("databend::log::query"), LevelFilter::Trace),
283288
))
289+
.filter(filter_by_thread_tracker())
284290
.append(query_log_file.with_layout(get_layout("identical")));
285291
logger = logger.dispatch(dispatch);
286292
}
@@ -306,6 +312,7 @@ pub fn init_logging(
306312
EnvFilterBuilder::new()
307313
.filter(Some("databend::log::query"), LevelFilter::Trace),
308314
))
315+
.filter(filter_by_thread_tracker())
309316
.append(otel);
310317
logger = logger.dispatch(dispatch);
311318
}
@@ -327,6 +334,7 @@ pub fn init_logging(
327334
EnvFilterBuilder::new()
328335
.filter(Some("databend::log::profile"), LevelFilter::Trace),
329336
))
337+
.filter(filter_by_thread_tracker())
330338
.append(profile_log_file.with_layout(get_layout("identical")));
331339
logger = logger.dispatch(dispatch);
332340
}
@@ -352,6 +360,7 @@ pub fn init_logging(
352360
EnvFilterBuilder::new()
353361
.filter(Some("databend::log::profile"), LevelFilter::Trace),
354362
))
363+
.filter(filter_by_thread_tracker())
355364
.append(otel);
356365
logger = logger.dispatch(dispatch);
357366
}
@@ -372,6 +381,7 @@ pub fn init_logging(
372381
EnvFilterBuilder::new()
373382
.filter(Some("databend::log::structlog"), LevelFilter::Trace),
374383
))
384+
.filter(filter_by_thread_tracker())
375385
.append(structlog_log_file);
376386
logger = logger.dispatch(dispatch);
377387
}
@@ -400,6 +410,7 @@ pub fn init_logging(
400410
.filter(EnvFilter::new(
401411
filter_builder.parse(&cfg.persistentlog.level),
402412
))
413+
.filter(filter_by_thread_tracker())
403414
.append(remote_log);
404415

405416
logger = logger.dispatch(dispatch);

src/common/tracing/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
mod config;
2121
mod crash_hook;
22+
mod filter;
2223
mod init;
2324
mod loggers;
2425
mod panic_hook;

src/query/ast/src/parser/parser.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,10 @@ pub fn run_parser<O>(
123123
} else {
124124
Err(ParseError(
125125
transform_span(&rest[..1]),
126-
"unable to parse rest of the sql".to_string(),
126+
format!(
127+
"unable to parse rest of the sql, rest tokens: {:?} ",
128+
rest.tokens
129+
),
127130
))
128131
}
129132
}

src/query/config/src/config.rs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2556,6 +2556,9 @@ pub struct PersistentLogConfig {
25562556
pub log_persistentlog_interval: usize,
25572557

25582558
/// Specifies the name of the staging area that temporarily holds log data before it is finally copied into the table
2559+
///
2560+
/// Note:
2561+
/// The default value uses an uuid to avoid conflicts with existing stages
25592562
#[clap(
25602563
long = "log-persistentlog-stage-name",
25612564
value_name = "VALUE",
@@ -2564,7 +2567,17 @@ pub struct PersistentLogConfig {
25642567
#[serde(rename = "stage_name")]
25652568
pub log_persistentlog_stage_name: String,
25662569

2567-
/// Specifies how long the persistent log should be retained, in hours
2570+
/// Log level <DEBUG|INFO|WARN|ERROR>
2571+
#[clap(
2572+
long = "log-persistentlog-level",
2573+
value_name = "VALUE",
2574+
default_value = "WARN"
2575+
)]
2576+
#[serde(rename = "level")]
2577+
pub log_persistentlog_level: String,
2578+
2579+
/// The retention period (in hours) for persistent logs.
2580+
/// Data older than this period will be deleted during retention tasks.
25682581
#[clap(
25692582
long = "log-persistentlog-retention",
25702583
value_name = "VALUE",
@@ -2573,14 +2586,15 @@ pub struct PersistentLogConfig {
25732586
#[serde(rename = "retention")]
25742587
pub log_persistentlog_retention: usize,
25752588

2576-
/// Log level <DEBUG|INFO|WARN|ERROR>
2589+
/// The interval (in hours) at which the retention process is triggered.
2590+
/// Specifies how often the retention task runs to clean up old data.
25772591
#[clap(
2578-
long = "log-persistentlog-level",
2592+
long = "log-persistentlog-retention-interval",
25792593
value_name = "VALUE",
2580-
default_value = "WARN"
2594+
default_value = "24"
25812595
)]
2582-
#[serde(rename = "level")]
2583-
pub log_persistentlog_level: String,
2596+
#[serde(rename = "retention_interval")]
2597+
pub log_persistentlog_retention_interval: usize,
25842598
}
25852599

25862600
impl Default for PersistentLogConfig {
@@ -2599,6 +2613,7 @@ impl TryInto<InnerPersistentLogConfig> for PersistentLogConfig {
25992613
stage_name: self.log_persistentlog_stage_name,
26002614
level: self.log_persistentlog_level,
26012615
retention: self.log_persistentlog_retention,
2616+
retention_interval: self.log_persistentlog_retention_interval,
26022617
})
26032618
}
26042619
}
@@ -2611,6 +2626,7 @@ impl From<InnerPersistentLogConfig> for PersistentLogConfig {
26112626
log_persistentlog_stage_name: inner.stage_name,
26122627
log_persistentlog_level: inner.level,
26132628
log_persistentlog_retention: inner.retention,
2629+
log_persistentlog_retention_interval: inner.retention_interval,
26142630
}
26152631
}
26162632
}

src/query/expression/src/utils/display.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::fmt::Write;
2020
use comfy_table::Cell;
2121
use comfy_table::Table;
2222
use databend_common_ast::ast::quote::display_ident;
23+
use databend_common_ast::ast::quote::QuotedString;
2324
use databend_common_ast::parser::Dialect;
2425
use databend_common_column::binary::BinaryColumn;
2526
use databend_common_io::deserialize_bitmap;
@@ -244,7 +245,7 @@ impl Display for ScalarRef<'_> {
244245
}
245246
Ok(())
246247
}
247-
ScalarRef::String(s) => write!(f, "'{s}'"),
248+
ScalarRef::String(s) => write!(f, "{}", QuotedString(s, '\'')),
248249
ScalarRef::Timestamp(t) => write!(f, "'{}'", timestamp_to_string(*t, &TimeZone::UTC)),
249250
ScalarRef::Date(d) => write!(f, "'{}'", date_to_string(*d as i64, &TimeZone::UTC)),
250251
ScalarRef::Interval(interval) => write!(f, "'{}'", interval_to_string(interval)),
@@ -282,19 +283,19 @@ impl Display for ScalarRef<'_> {
282283
ScalarRef::Variant(s) => {
283284
let raw_jsonb = RawJsonb::new(s);
284285
let value = raw_jsonb.to_string();
285-
write!(f, "'{value}'")
286+
write!(f, "{}", QuotedString(value, '\''))
286287
}
287288
ScalarRef::Geometry(s) => {
288289
let geom = ewkb_to_geo(&mut Ewkb(s))
289290
.and_then(|(geo, srid)| geo_to_ewkt(geo, srid))
290291
.unwrap_or_else(|e| format!("GeozeroError: {:?}", e));
291-
write!(f, "'{geom}'")
292+
write!(f, "{}", QuotedString(geom, '\''))
292293
}
293294
ScalarRef::Geography(v) => {
294295
let geog = ewkb_to_geo(&mut Ewkb(v.0))
295296
.and_then(|(geo, srid)| geo_to_ewkt(geo, srid))
296297
.unwrap_or_else(|e| format!("GeozeroError: {:?}", e));
297-
write!(f, "'{geog}'")
298+
write!(f, "{}", QuotedString(geog, '\''))
298299
}
299300
}
300301
}

src/query/functions/tests/it/scalars/testdata/comparison.txt

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1172,7 +1172,7 @@ evaluation:
11721172
| Row 1 | '-32768' | '-32768' | true |
11731173
| Row 2 | '1234.5678' | '1234.5678' | true |
11741174
| Row 3 | '1.912e2' | '1.912e2' | true |
1175-
| Row 4 | '"\\\"abc\\\""' | '"\\\"abc\\\""' | true |
1175+
| Row 4 | '"\\\\\\"abc\\\\\\""' | '"\\\\\\"abc\\\\\\""' | true |
11761176
| Row 5 | '{"k":"v","a":"b"}' | '{"k":"v","a":"d"}' | false |
11771177
| Row 6 | '[1,2,3,["a","b","d"]]' | '[1,2,3,["a","b","c"]]' | true |
11781178
+--------+-----------------------------------------------------------+-----------------------------------------------------------+---------+
@@ -1199,7 +1199,7 @@ evaluation:
11991199
| Row 1 | '-32768' | '-32768' | true |
12001200
| Row 2 | '1234.5678' | '1234.5678' | true |
12011201
| Row 3 | '1.912e2' | '1.912e2' | true |
1202-
| Row 4 | '"\\\"abc\\\""' | '"\\\"abc\\\""' | true |
1202+
| Row 4 | '"\\\\\\"abc\\\\\\""' | '"\\\\\\"abc\\\\\\""' | true |
12031203
| Row 5 | '{"k":"v","a":"b"}' | '{"k":"v","a":"d"}' | false |
12041204
| Row 6 | '[1,2,3,["a","b","d"]]' | '[1,2,3,["a","b","c"]]' | true |
12051205
+--------+-----------------------------------------------------------+-----------------------------------------------------------+---------------+
@@ -1224,8 +1224,7 @@ output : false
12241224

12251225
ast : 'hello
12261226
' like 'h%'
1227-
raw expr : like('hello
1228-
', 'h%')
1227+
raw expr : like('hello\n', 'h%')
12291228
checked expr : like<String, String>("hello\n", "h%")
12301229
optimized expr : true
12311230
output type : Boolean
@@ -1235,8 +1234,7 @@ output : true
12351234

12361235
ast : 'h
12371236
' like 'h_'
1238-
raw expr : like('h
1239-
', 'h_')
1237+
raw expr : like('h\n', 'h_')
12401238
checked expr : like<String, String>("h\n", "h_")
12411239
optimized expr : true
12421240
output type : Boolean
@@ -1245,7 +1243,7 @@ output : true
12451243

12461244

12471245
ast : '%' like '\%'
1248-
raw expr : like('%', '\%')
1246+
raw expr : like('%', '\\%')
12491247
checked expr : like<String, String>("%", "\\%")
12501248
optimized expr : true
12511249
output type : Boolean
@@ -1254,7 +1252,7 @@ output : true
12541252

12551253

12561254
ast : 'v%xx' like '_\%%'
1257-
raw expr : like('v%xx', '_\%%')
1255+
raw expr : like('v%xx', '_\\%%')
12581256
checked expr : like<String, String>("v%xx", "_\\%%")
12591257
optimized expr : true
12601258
output type : Boolean

0 commit comments

Comments
 (0)