Skip to content

Commit 98167cb

Browse files
authored
fix: otlp logger runtime for graceful shutdown (#14539)
fix: otlp logger flush
1 parent 8ce71df commit 98167cb

File tree

4 files changed

+38
-18
lines changed

4 files changed

+38
-18
lines changed

Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/tracing/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ databend-common-base = { path = "../base" }
1919

2020
# Crates.io dependencies
2121
console-subscriber = { version = "0.2.0", optional = true }
22-
defer = "0.1"
22+
defer = "0.2"
2323
fern = "0.6.2"
2424
humantime = "2.1.0"
2525
itertools = { workspace = true }

src/common/tracing/src/loggers.rs

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@
1515
use std::collections::BTreeMap;
1616
use std::fmt;
1717
use std::io::BufWriter;
18+
use std::sync::Arc;
1819
use std::time::Duration;
1920
use std::time::SystemTime;
2021

2122
use fern::FormatCallback;
2223
use opentelemetry::logs::AnyValue;
23-
use opentelemetry::logs::Logger;
24-
use opentelemetry::logs::LoggerProvider;
2524
use opentelemetry::logs::Severity;
25+
use opentelemetry::InstrumentationLibrary;
2626
use opentelemetry_otlp::WithExportConfig;
2727
use serde_json::Map;
2828
use tracing_appender::non_blocking::NonBlocking;
@@ -81,9 +81,9 @@ impl log::Log for MinitraceLogger {
8181
fn flush(&self) {}
8282
}
8383

84+
#[derive(Debug)]
8485
pub(crate) struct OpenTelemetryLogger {
85-
logger: opentelemetry_sdk::logs::Logger,
86-
// keep provider alive
86+
library: Arc<InstrumentationLibrary>,
8787
provider: opentelemetry_sdk::logs::LoggerProvider,
8888
}
8989

@@ -111,14 +111,23 @@ impl OpenTelemetryLogger {
111111
.build_log_exporter()
112112
.expect("build log exporter");
113113
let provider = opentelemetry_sdk::logs::LoggerProvider::builder()
114-
.with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
114+
.with_simple_exporter(exporter)
115115
.with_config(
116116
opentelemetry_sdk::logs::Config::default()
117117
.with_resource(opentelemetry_sdk::Resource::new(kvs)),
118118
)
119119
.build();
120-
let logger = provider.versioned_logger(name.to_string(), None, None, None);
121-
Self { logger, provider }
120+
let library = Arc::new(InstrumentationLibrary::new(
121+
name.to_string(),
122+
None::<&str>,
123+
None::<&str>,
124+
None,
125+
));
126+
Self { library, provider }
127+
}
128+
129+
pub fn instrumentation_library(&self) -> &InstrumentationLibrary {
130+
&self.library
122131
}
123132
}
124133

@@ -128,13 +137,24 @@ impl log::Log for OpenTelemetryLogger {
128137
true
129138
}
130139

131-
fn log(&self, record: &log::Record<'_>) {
140+
fn log(&self, log_record: &log::Record<'_>) {
141+
let provider = self.provider.clone();
142+
let config = provider.config();
132143
let builder = opentelemetry::logs::LogRecord::builder()
133144
.with_observed_timestamp(SystemTime::now())
134-
.with_severity_number(map_severity_to_otel_severity(record.level()))
135-
.with_severity_text(record.level().as_str())
136-
.with_body(AnyValue::from(record.args().to_string()));
137-
self.logger.emit(builder.build())
145+
.with_severity_number(map_severity_to_otel_severity(log_record.level()))
146+
.with_severity_text(log_record.level().as_str())
147+
.with_body(AnyValue::from(log_record.args().to_string()));
148+
let record = builder.build();
149+
for processor in provider.log_processors() {
150+
let record = record.clone();
151+
let data = opentelemetry_sdk::export::logs::LogData {
152+
record,
153+
resource: config.resource.clone(),
154+
instrumentation: self.instrumentation_library().clone(),
155+
};
156+
processor.emit(data);
157+
}
138158
}
139159

140160
fn flush(&self) {

src/query/service/src/servers/server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ impl ShutdownHandle {
5858
let mut shutdown_jobs = vec![];
5959
for (name, service) in &mut self.services {
6060
shutdown_jobs.push(async move {
61-
info!("Stop {} service", name);
61+
info!("Stopping {} service", name);
6262
service.shutdown(graceful).await;
6363
info!("Stopped {} service", name);
6464
});

0 commit comments

Comments
 (0)