Skip to content

Commit 8aa5b00

Browse files
authored
feat: Add experimental concurrent processor for logs (#2780)
1 parent 52cd0e9 commit 8aa5b00

File tree

7 files changed

+185
-30
lines changed

7 files changed

+185
-30
lines changed

opentelemetry-sdk/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ internal-logs = ["tracing"]
5656
experimental_metrics_periodicreader_with_async_runtime = ["metrics"]
5757
spec_unstable_metrics_views = ["metrics"]
5858
experimental_logs_batch_log_processor_with_async_runtime = ["logs"]
59+
experimental_logs_concurrent_log_processor = ["logs"]
5960
experimental_trace_batch_span_processor_with_async_runtime = ["trace"]
6061
experimental_metrics_disable_name_validation = ["metrics"]
6162

@@ -88,6 +89,11 @@ harness = false
8889
name = "log_processor"
8990
harness = false
9091

92+
[[bench]]
93+
name = "log_enabled"
94+
harness = false
95+
required-features = ["spec_unstable_logs_enabled", "experimental_logs_concurrent_log_processor"]
96+
9197
[[bench]]
9298
name = "tracer_creation"
9399
harness = false
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
The benchmark results:
3+
criterion = "0.5.1"
4+
Hardware: Apple M4 Pro
5+
Total Number of Cores:   14 (10 performance and 4 efficiency)
6+
| Test | Average time|
7+
|---------------------------------------------|-------------|
8+
| exporter_disabled_concurrent_processor | 1.9 ns |
9+
| exporter_disabled_simple_processor | 5.0 ns |
10+
*/
11+
12+
use criterion::{criterion_group, criterion_main, Criterion};
13+
use opentelemetry::logs::{Logger, LoggerProvider};
14+
use opentelemetry_sdk::error::OTelSdkResult;
15+
use opentelemetry_sdk::logs::concurrent_log_processor::SimpleConcurrentLogProcessor;
16+
use opentelemetry_sdk::logs::{
17+
LogBatch, LogExporter, LogProcessor, SdkLoggerProvider, SimpleLogProcessor,
18+
};
19+
use opentelemetry_sdk::Resource;
20+
#[cfg(not(target_os = "windows"))]
21+
use pprof::criterion::{Output, PProfProfiler};
22+
23+
#[derive(Debug)]
24+
struct NoopExporter;
25+
impl LogExporter for NoopExporter {
26+
async fn export(&self, _: LogBatch<'_>) -> OTelSdkResult {
27+
Ok(())
28+
}
29+
30+
fn shutdown(&self) -> OTelSdkResult {
31+
Ok(())
32+
}
33+
34+
fn event_enabled(
35+
&self,
36+
_level: opentelemetry::logs::Severity,
37+
_target: &str,
38+
_name: Option<&str>,
39+
) -> bool {
40+
false
41+
}
42+
43+
fn set_resource(&mut self, _: &Resource) {}
44+
}
45+
46+
fn benchmark_exporter_enabled_false<T>(c: &mut Criterion, name: &str, processor: T)
47+
where
48+
T: LogProcessor + Send + Sync + 'static,
49+
{
50+
let provider = SdkLoggerProvider::builder()
51+
.with_log_processor(processor)
52+
.build();
53+
let logger = provider.logger("test_logger");
54+
55+
c.bench_function(name, |b| {
56+
b.iter(|| {
57+
logger.event_enabled(opentelemetry::logs::Severity::Debug, "target", Some("name"));
58+
});
59+
});
60+
}
61+
62+
fn criterion_benchmark(c: &mut Criterion) {
63+
let processor = SimpleConcurrentLogProcessor::new(NoopExporter);
64+
benchmark_exporter_enabled_false(c, "exporter_disabled_concurrent_processor", processor);
65+
let simple = SimpleLogProcessor::new(NoopExporter);
66+
benchmark_exporter_enabled_false(c, "exporter_disabled_simple_processor", simple);
67+
}
68+
69+
#[cfg(not(target_os = "windows"))]
70+
criterion_group! {
71+
name = benches;
72+
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
73+
targets = criterion_benchmark
74+
}
75+
#[cfg(target_os = "windows")]
76+
criterion_group! {
77+
name = benches;
78+
config = Criterion::default();
79+
targets = criterion_benchmark
80+
}
81+
criterion_main!(benches);
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
use opentelemetry::{otel_info, InstrumentationScope};
2+
3+
use crate::error::OTelSdkResult;
4+
5+
use super::{LogBatch, LogExporter, LogProcessor, SdkLogRecord};
6+
7+
/// A concurrent log processor calls exporter's export method on each emit. This
8+
/// processor does not buffer logs. Note: This invokes exporter's export method
9+
/// on the current thread without synchronization. i.e multiple export() calls
10+
/// can happen simultaneously from different threads. This is not a problem if
11+
/// the exporter is designed to handle that. As of now, exporters in the
12+
/// opentelemetry-rust project (stdout/otlp) are not thread-safe.
13+
/// This is intended to be used when exporting to operating system
14+
/// tracing facilities like Windows ETW, Linux TracePoints etc.
15+
#[derive(Debug)]
16+
pub struct SimpleConcurrentLogProcessor<T: LogExporter> {
17+
exporter: T,
18+
}
19+
20+
impl<T: LogExporter> SimpleConcurrentLogProcessor<T> {
21+
/// Creates a new `ConcurrentExportProcessor` with the given exporter.
22+
pub fn new(exporter: T) -> Self {
23+
Self { exporter }
24+
}
25+
}
26+
27+
impl<T: LogExporter> LogProcessor for SimpleConcurrentLogProcessor<T> {
28+
fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
29+
let log_tuple = &[(record as &SdkLogRecord, instrumentation)];
30+
let result = futures_executor::block_on(self.exporter.export(LogBatch::new(log_tuple)));
31+
if let Err(err) = result {
32+
otel_info!(
33+
name: "SimpleConcurrentLogProcessor.Emit.ExportError",
34+
error = format!("{}",err)
35+
);
36+
}
37+
}
38+
39+
fn force_flush(&self) -> OTelSdkResult {
40+
// TODO: invoke flush on exporter
41+
// once https://github.com/open-telemetry/opentelemetry-rust/issues/2261
42+
// is resolved
43+
Ok(())
44+
}
45+
46+
fn shutdown(&self) -> OTelSdkResult {
47+
self.exporter.shutdown()
48+
}
49+
50+
#[cfg(feature = "spec_unstable_logs_enabled")]
51+
fn event_enabled(
52+
&self,
53+
level: opentelemetry::logs::Severity,
54+
target: &str,
55+
name: Option<&str>,
56+
) -> bool {
57+
self.exporter.event_enabled(level, target, name)
58+
}
59+
}

opentelemetry-sdk/src/logs/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ pub use logger_provider::{LoggerProviderBuilder, SdkLoggerProvider};
2727
pub use record::{SdkLogRecord, TraceContext};
2828
pub use simple_log_processor::SimpleLogProcessor;
2929

30+
#[cfg(feature = "experimental_logs_concurrent_log_processor")]
31+
/// Module for ConcurrentLogProcessor.
32+
pub mod concurrent_log_processor;
33+
3034
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
3135
/// Module for BatchLogProcessor with async runtime.
3236
pub mod log_processor_with_async_runtime;

opentelemetry-sdk/src/logs/simple_log_processor.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ pub struct SimpleLogProcessor<T: LogExporter> {
6565
}
6666

6767
impl<T: LogExporter> SimpleLogProcessor<T> {
68-
pub(crate) fn new(exporter: T) -> Self {
68+
/// Creates a new instance of `SimpleLogProcessor`.
69+
pub fn new(exporter: T) -> Self {
6970
SimpleLogProcessor {
7071
exporter: Mutex::new(exporter),
7172
is_shutdown: AtomicBool::new(false),
@@ -131,6 +132,20 @@ impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
131132
exporter.set_resource(resource);
132133
}
133134
}
135+
136+
#[cfg(feature = "spec_unstable_logs_enabled")]
137+
fn event_enabled(
138+
&self,
139+
level: opentelemetry::logs::Severity,
140+
target: &str,
141+
name: Option<&str>,
142+
) -> bool {
143+
if let Ok(exporter) = self.exporter.lock() {
144+
exporter.event_enabled(level, target, name)
145+
} else {
146+
true
147+
}
148+
}
134149
}
135150

136151
#[cfg(all(test, feature = "testing", feature = "logs"))]

stress/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ ctrlc = { workspace = true }
4444
lazy_static = { workspace = true }
4545
num_cpus = { workspace = true }
4646
opentelemetry = { path = "../opentelemetry", features = ["metrics", "logs", "trace", "spec_unstable_logs_enabled"] }
47-
opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["metrics", "logs", "trace", "spec_unstable_logs_enabled"] }
47+
opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["metrics", "logs", "trace", "spec_unstable_logs_enabled", "experimental_logs_concurrent_log_processor"] }
4848
opentelemetry-appender-tracing = { workspace = true, features = ["spec_unstable_logs_enabled"] }
4949
rand = { workspace = true, features = ["small_rng", "os_rng"] }
5050
tracing = { workspace = true, features = ["std"]}

stress/src/logs.rs

Lines changed: 18 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -12,45 +12,34 @@
1212
Total Number of Cores: 14 (10 performance and 4 efficiency)
1313
~50 M/sec
1414
~1.1 B/sec (when disabled)
15-
*/
1615
17-
use opentelemetry::InstrumentationScope;
16+
With existing SimpleLogProcessor:
17+
3 M/sec (when enabled) (.with_log_processor(SimpleLogProcessor::new(NoopExporter::new(true))))
18+
26 M/sec (when disabled) (.with_log_processor(SimpleLogProcessor::new(NoopExporter::new(false)))
19+
*/
1820
use opentelemetry_appender_tracing::layer;
1921
use opentelemetry_sdk::error::OTelSdkResult;
22+
use opentelemetry_sdk::logs::concurrent_log_processor::SimpleConcurrentLogProcessor;
23+
use opentelemetry_sdk::logs::SdkLoggerProvider;
2024
use opentelemetry_sdk::logs::{LogBatch, LogExporter};
21-
use opentelemetry_sdk::logs::{LogProcessor, SdkLogRecord, SdkLoggerProvider};
2225

26+
use opentelemetry_sdk::Resource;
2327
use tracing::error;
2428
use tracing_subscriber::prelude::*;
2529

2630
mod throughput;
2731

28-
#[derive(Debug, Clone)]
29-
struct MockLogExporter;
30-
31-
impl LogExporter for MockLogExporter {
32-
async fn export(&self, _batch: LogBatch<'_>) -> OTelSdkResult {
33-
Ok(())
34-
}
35-
}
36-
3732
#[derive(Debug)]
38-
pub struct MockLogProcessor {
39-
exporter: MockLogExporter,
33+
struct NoopExporter {
4034
enabled: bool,
4135
}
42-
43-
impl LogProcessor for MockLogProcessor {
44-
fn emit(
45-
&self,
46-
record: &mut opentelemetry_sdk::logs::SdkLogRecord,
47-
scope: &InstrumentationScope,
48-
) {
49-
let log_tuple = &[(record as &SdkLogRecord, scope)];
50-
let _ = futures_executor::block_on(self.exporter.export(LogBatch::new(log_tuple)));
36+
impl NoopExporter {
37+
fn new(enabled: bool) -> Self {
38+
Self { enabled }
5139
}
52-
53-
fn force_flush(&self) -> OTelSdkResult {
40+
}
41+
impl LogExporter for NoopExporter {
42+
async fn export(&self, _: LogBatch<'_>) -> OTelSdkResult {
5443
Ok(())
5544
}
5645

@@ -66,6 +55,8 @@ impl LogProcessor for MockLogProcessor {
6655
) -> bool {
6756
self.enabled
6857
}
58+
59+
fn set_resource(&mut self, _: &Resource) {}
6960
}
7061

7162
fn main() {
@@ -74,10 +65,9 @@ fn main() {
7465

7566
// LoggerProvider with a no-op processor.
7667
let provider: SdkLoggerProvider = SdkLoggerProvider::builder()
77-
.with_log_processor(MockLogProcessor {
78-
exporter: MockLogExporter {},
68+
.with_log_processor(SimpleConcurrentLogProcessor::new(NoopExporter::new(
7969
enabled,
80-
})
70+
)))
8171
.build();
8272

8373
// Use the OpenTelemetryTracingBridge to test the throughput of the appender-tracing.

0 commit comments

Comments
 (0)