Skip to content

Commit 974e552

Browse files
committed
fix!: use TraceProvider::flush_force() during test
1 parent 4a321b0 commit 974e552

File tree

2 files changed

+26
-15
lines changed
  • fake-opentelemetry-collector/src
  • testing-tracing-opentelemetry/src

2 files changed

+26
-15
lines changed

fake-opentelemetry-collector/src/lib.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use std::net::SocketAddr;
1111
use std::time::{Duration, Instant};
1212

1313
use futures::StreamExt;
14-
use opentelemetry::trace::TracerProvider;
1514
use opentelemetry_otlp::{LogExporter, SpanExporter, WithExportConfig};
1615
use opentelemetry_proto::tonic::collector::logs::v1::logs_service_server::LogsServiceServer;
1716
use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer;
@@ -88,14 +87,16 @@ impl FakeCollectorServer {
8887

8988
async fn recv_many<T>(rx: &mut Receiver<T>, at_least: usize, timeout: Duration) -> Vec<T> {
9089
let deadline = Instant::now();
91-
let pause = (timeout / 5).min(Duration::from_millis(500));
90+
let pause = (timeout / 10).min(Duration::from_millis(10));
9291
while rx.len() < at_least && deadline.elapsed() < timeout {
9392
tokio::time::sleep(pause).await;
9493
}
9594
std::iter::from_fn(|| rx.try_recv().ok()).collect::<Vec<_>>()
9695
}
9796

98-
pub async fn setup_tracer(fake_server: &FakeCollectorServer) -> opentelemetry_sdk::trace::Tracer {
97+
pub async fn setup_tracer_provider(
98+
fake_server: &FakeCollectorServer,
99+
) -> opentelemetry_sdk::trace::TracerProvider {
99100
// if the environment variable is set (in test or in caller), `with_endpoint` value is ignored
100101
std::env::remove_var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT");
101102

@@ -109,10 +110,9 @@ pub async fn setup_tracer(fake_server: &FakeCollectorServer) -> opentelemetry_sd
109110
opentelemetry_sdk::runtime::Tokio,
110111
)
111112
.build()
112-
.tracer("")
113113
}
114114

115-
pub async fn setup_logger(
115+
pub async fn setup_logger_provider(
116116
fake_server: &FakeCollectorServer,
117117
) -> opentelemetry_sdk::logs::LoggerProvider {
118118
opentelemetry_sdk::logs::LoggerProvider::builder()
@@ -131,16 +131,17 @@ pub async fn setup_logger(
131131
mod tests {
132132
use super::*;
133133

134-
use opentelemetry::global::shutdown_tracer_provider;
135134
use opentelemetry::logs::{LogRecord, Logger, LoggerProvider, Severity};
135+
use opentelemetry::trace::TracerProvider;
136136
use opentelemetry::trace::{Span, SpanKind, Tracer};
137137

138138
#[tokio::test(flavor = "multi_thread")]
139139
async fn test_fake_tracer_and_collector() {
140140
let mut fake_collector = FakeCollectorServer::start()
141141
.await
142142
.expect("fake collector setup and started");
143-
let tracer = setup_tracer(&fake_collector).await;
143+
let tracer_provider = setup_tracer_provider(&fake_collector).await;
144+
let tracer = tracer_provider.tracer("test");
144145

145146
debug!("Sending span...");
146147
let mut span = tracer
@@ -149,7 +150,12 @@ mod tests {
149150
.start(&tracer);
150151
span.add_event("my-test-event", vec![]);
151152
span.end();
152-
shutdown_tracer_provider();
153+
154+
let _ = tracer_provider.force_flush();
155+
tracer_provider
156+
.shutdown()
157+
.expect("no error during shutdown");
158+
drop(tracer_provider);
153159

154160
let otel_spans = fake_collector
155161
.exported_spans(1, Duration::from_secs(20))
@@ -184,7 +190,7 @@ mod tests {
184190
.await
185191
.expect("fake collector setup and started");
186192

187-
let logger_provider = setup_logger(&fake_collector).await;
193+
let logger_provider = setup_logger_provider(&fake_collector).await;
188194
let logger = logger_provider.logger("test");
189195
let mut record = logger.create_log_record();
190196
record.set_body("This is information".into());

testing-tracing-opentelemetry/src/lib.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use assert2::{check, let_assert};
2+
use opentelemetry::trace::TracerProvider;
23
use opentelemetry_sdk::propagation::TraceContextPropagator;
34
use serde_json::Value;
45
use std::sync::mpsc::{self, Receiver, SyncSender};
5-
66
use tracing_subscriber::{
77
fmt::{format::FmtSpan, MakeWriter},
88
util::SubscriberInitExt,
@@ -88,6 +88,7 @@ pub struct FakeEnvironment {
8888
fake_collector: fake_opentelemetry_collector::FakeCollectorServer,
8989
rx: Receiver<Vec<u8>>,
9090
_subsciber_guard: tracing::subscriber::DefaultGuard,
91+
tracer_provider: opentelemetry_sdk::trace::TracerProvider,
9192
}
9293

9394
impl FakeEnvironment {
@@ -100,10 +101,11 @@ impl FakeEnvironment {
100101
let fake_collector = fake_opentelemetry_collector::FakeCollectorServer::start()
101102
.await
102103
.unwrap();
103-
let tracer = fake_opentelemetry_collector::setup_tracer(&fake_collector).await;
104+
let tracer_provider =
105+
fake_opentelemetry_collector::setup_tracer_provider(&fake_collector).await;
104106
//let (tracer, mut req_rx) = fake_opentelemetry_collector::setup_tracer().await;
105107
opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
106-
let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
108+
let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer_provider.tracer("fake"));
107109

108110
let (make_writer, rx) = duplex_writer();
109111
let fmt_layer = tracing_subscriber::fmt::layer()
@@ -119,21 +121,24 @@ impl FakeEnvironment {
119121
fake_collector,
120122
rx,
121123
_subsciber_guard,
124+
tracer_provider,
122125
}
123126
}
124127

125128
pub async fn collect_traces(
126129
&mut self,
127130
) -> (Vec<Value>, Vec<fake_opentelemetry_collector::ExportedSpan>) {
128-
opentelemetry::global::shutdown_tracer_provider();
131+
let _ = self.tracer_provider.force_flush();
132+
129133
let otel_spans = self
130134
.fake_collector
131-
.exported_spans(1, std::time::Duration::from_millis(5000))
135+
.exported_spans(1, std::time::Duration::from_millis(100))
132136
.await;
133137
// insta::assert_debug_snapshot!(first_span);
134138
let tracing_events = std::iter::from_fn(|| {
135139
self.rx
136-
.recv_timeout(std::time::Duration::from_millis(500))
140+
.recv_timeout(std::time::Duration::from_millis(3))
141+
//.recv()
137142
.ok()
138143
})
139144
.map(|bytes| serde_json::from_slice::<Value>(&bytes).unwrap())

0 commit comments

Comments
 (0)