Skip to content

Commit 5cf034c

Browse files
frigus02TommyCpp
andauthored
Change SpanExporter::export &self to &mut self (#350)
This enforces the following part of the spec: > Export() will never be called concurrently for the same exporter > instance. Export() can be called again only after the current call > returns. None of the functions in the SpanExporter require it to be Sync. Export is guaranteed not to be called concurrently and shutdown should only be called once in total. This change means the stdout exporter no longer requires a Mutex around the Write. Co-authored-by: Zhongyang Wu <zhongyang.wu@outlook.com>
1 parent fd8cf65 commit 5cf034c

File tree

11 files changed

+148
-88
lines changed

11 files changed

+148
-88
lines changed

opentelemetry-contrib/src/trace/exporter/datadog/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ impl DatadogPipelineBuilder {
278278
#[async_trait]
279279
impl trace::SpanExporter for DatadogExporter {
280280
/// Export spans to datadog-agent
281-
async fn export(&self, batch: Vec<SpanData>) -> trace::ExportResult {
281+
async fn export(&mut self, batch: Vec<SpanData>) -> trace::ExportResult {
282282
let data = self.version.encode(&self.service_name, batch)?;
283283
let req = Request::builder()
284284
.method(Method::POST)

opentelemetry-jaeger/src/agent.rs

Lines changed: 14 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use crate::thrift::{
66
use crate::transport::{TBufferChannel, TNoopChannel};
77
use std::fmt;
88
use std::net::{ToSocketAddrs, UdpSocket};
9-
use std::sync::Mutex;
109
use thrift::{
1110
protocol::{TCompactInputProtocol, TCompactOutputProtocol},
1211
transport::{ReadHalf, TIoChannel, WriteHalf},
@@ -37,10 +36,10 @@ pub(crate) struct AgentAsyncClientUDP {
3736
#[cfg(all(not(feature = "async-std"), not(feature = "tokio")))]
3837
conn: UdpSocket,
3938
#[cfg(feature = "tokio")]
40-
conn: tokio::sync::Mutex<tokio::net::UdpSocket>,
39+
conn: tokio::net::UdpSocket,
4140
#[cfg(all(feature = "async-std", not(feature = "tokio")))]
42-
conn: async_std::sync::Mutex<async_std::net::UdpSocket>,
43-
buffer_client: Mutex<BufferClient>,
41+
conn: async_std::net::UdpSocket,
42+
buffer_client: BufferClient,
4443
}
4544

4645
impl AgentAsyncClientUDP {
@@ -59,33 +58,18 @@ impl AgentAsyncClientUDP {
5958
#[cfg(all(not(feature = "async-std"), not(feature = "tokio")))]
6059
conn,
6160
#[cfg(feature = "tokio")]
62-
conn: tokio::sync::Mutex::new(tokio::net::UdpSocket::from_std(conn)?),
61+
conn: tokio::net::UdpSocket::from_std(conn)?,
6362
#[cfg(all(feature = "async-std", not(feature = "tokio")))]
64-
conn: async_std::sync::Mutex::new(async_std::net::UdpSocket::from(conn)),
65-
buffer_client: Mutex::new(BufferClient { buffer, client }),
63+
conn: async_std::net::UdpSocket::from(conn),
64+
buffer_client: BufferClient { buffer, client },
6665
})
6766
}
6867

6968
/// Emit standard Jaeger batch
70-
pub(crate) async fn emit_batch(&self, batch: jaeger::Batch) -> thrift::Result<()> {
69+
pub(crate) async fn emit_batch(&mut self, batch: jaeger::Batch) -> thrift::Result<()> {
7170
// Write payload to buffer
72-
let payload = self
73-
.buffer_client
74-
.lock()
75-
.map_err(|err| {
76-
thrift::Error::from(std::io::Error::new(
77-
std::io::ErrorKind::Other,
78-
err.to_string(),
79-
))
80-
})
81-
.and_then(|mut buffer_client| {
82-
// Write to tmp buffer
83-
buffer_client.client.emit_batch(batch)?;
84-
// extract written payload, clearing buffer
85-
let payload = buffer_client.buffer.take_bytes();
86-
87-
Ok(payload)
88-
})?;
71+
self.buffer_client.client.emit_batch(batch)?;
72+
let payload = self.buffer_client.buffer.take_bytes();
8973

9074
// Write async to socket, reading from buffer
9175
write_to_socket(self, payload).await?;
@@ -95,24 +79,22 @@ impl AgentAsyncClientUDP {
9579
}
9680

9781
#[cfg(all(not(feature = "async-std"), not(feature = "tokio")))]
98-
async fn write_to_socket(client: &AgentAsyncClientUDP, payload: Vec<u8>) -> thrift::Result<()> {
82+
async fn write_to_socket(client: &mut AgentAsyncClientUDP, payload: Vec<u8>) -> thrift::Result<()> {
9983
client.conn.send(&payload)?;
10084

10185
Ok(())
10286
}
10387

10488
#[cfg(feature = "tokio")]
105-
async fn write_to_socket(client: &AgentAsyncClientUDP, payload: Vec<u8>) -> thrift::Result<()> {
106-
let mut conn = client.conn.lock().await;
107-
conn.send(&payload).await?;
89+
async fn write_to_socket(client: &mut AgentAsyncClientUDP, payload: Vec<u8>) -> thrift::Result<()> {
90+
client.conn.send(&payload).await?;
10891

10992
Ok(())
11093
}
11194

11295
#[cfg(all(feature = "async-std", not(feature = "tokio")))]
113-
async fn write_to_socket(client: &AgentAsyncClientUDP, payload: Vec<u8>) -> thrift::Result<()> {
114-
let conn = client.conn.lock().await;
115-
conn.send(&payload).await?;
96+
async fn write_to_socket(client: &mut AgentAsyncClientUDP, payload: Vec<u8>) -> thrift::Result<()> {
97+
client.conn.send(&payload).await?;
11698

11799
Ok(())
118100
}

opentelemetry-jaeger/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ impl Into<jaeger::Process> for Process {
258258
#[async_trait]
259259
impl trace::SpanExporter for Exporter {
260260
/// Export spans to Jaeger
261-
async fn export(&self, batch: Vec<trace::SpanData>) -> trace::ExportResult {
261+
async fn export(&mut self, batch: Vec<trace::SpanData>) -> trace::ExportResult {
262262
let mut jaeger_spans: Vec<jaeger::Span> = Vec::with_capacity(batch.len());
263263
let mut process = self.process.clone();
264264

opentelemetry-jaeger/src/uploader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ pub(crate) enum BatchUploader {
1616

1717
impl BatchUploader {
1818
/// Emit a jaeger batch for the given uploader
19-
pub(crate) async fn upload(&self, batch: jaeger::Batch) -> trace::ExportResult {
19+
pub(crate) async fn upload(&mut self, batch: jaeger::Batch) -> trace::ExportResult {
2020
match self {
2121
BatchUploader::Agent(client) => {
2222
// TODO Implement retry behaviour

opentelemetry-otlp/src/span.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ impl Exporter {
146146

147147
#[async_trait]
148148
impl SpanExporter for Exporter {
149-
async fn export(&self, batch: Vec<SpanData>) -> ExportResult {
149+
async fn export(&mut self, batch: Vec<SpanData>) -> ExportResult {
150150
let request = ExportTraceServiceRequest {
151151
resource_spans: RepeatedField::from_vec(batch.into_iter().map(Into::into).collect()),
152152
unknown_fields: Default::default(),

opentelemetry-zipkin/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ impl ZipkinPipelineBuilder {
309309
#[async_trait]
310310
impl trace::SpanExporter for Exporter {
311311
/// Export spans to Zipkin collector.
312-
async fn export(&self, batch: Vec<trace::SpanData>) -> trace::ExportResult {
312+
async fn export(&mut self, batch: Vec<trace::SpanData>) -> trace::ExportResult {
313313
let zipkin_spans = batch
314314
.into_iter()
315315
.map(|span| model::into_zipkin_span(self.local_endpoint.clone(), span))

opentelemetry/src/api/trace/noop.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ impl NoopSpanExporter {
181181

182182
#[async_trait]
183183
impl SpanExporter for NoopSpanExporter {
184-
async fn export(&self, _batch: Vec<SpanData>) -> ExportResult {
184+
async fn export(&mut self, _batch: Vec<SpanData>) -> ExportResult {
185185
Ok(())
186186
}
187187
}

opentelemetry/src/exporter/trace/mod.rs

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,45 +23,43 @@ pub type ExportResult = Result<(), Box<dyn std::error::Error + Send + Sync + 'st
2323
/// implement so that they can be plugged into OpenTelemetry SDK and support
2424
/// sending of telemetry data.
2525
///
26-
/// The goals of the interface are:
27-
///
28-
/// - Minimize burden of implementation for protocol-dependent telemetry
29-
/// exporters. The protocol exporter is expected to be primarily a simple
30-
/// telemetry data encoder and transmitter.
31-
/// - Allow implementing helpers as composable components that use the same
32-
/// chainable Exporter interface. SDK authors are encouraged to implement common
33-
/// functionality such as queuing, batching, tagging, etc. as helpers. This
34-
/// functionality will be applicable regardless of what protocol exporter is used.
26+
/// The goal of the interface is to minimize burden of implementation for
27+
/// protocol-dependent telemetry exporters. The protocol exporter is expected to
28+
/// be primarily a simple telemetry data encoder and transmitter.
3529
#[async_trait]
36-
pub trait SpanExporter: Send + Sync + std::fmt::Debug {
37-
/// Exports a batch of telemetry data. Protocol exporters that will implement
38-
/// this function are typically expected to serialize and transmit the data
39-
/// to the destination.
30+
pub trait SpanExporter: Send + Debug {
31+
/// Exports a batch of readable spans. Protocol exporters that will
32+
/// implement this function are typically expected to serialize and transmit
33+
/// the data to the destination.
4034
///
4135
/// This function will never be called concurrently for the same exporter
4236
/// instance. It can be called again only after the current call returns.
4337
///
4438
/// This function must not block indefinitely, there must be a reasonable
4539
/// upper limit after which the call must time out with an error result.
46-
async fn export(&self, batch: Vec<SpanData>) -> ExportResult;
40+
///
41+
/// Any retry logic that is required by the exporter is the responsibility
42+
/// of the exporter.
43+
async fn export(&mut self, batch: Vec<SpanData>) -> ExportResult;
4744

4845
/// Shuts down the exporter. Called when SDK is shut down. This is an
4946
/// opportunity for exporter to do any cleanup required.
5047
///
51-
/// `shutdown` should be called only once for each Exporter instance. After
52-
/// the call to `shutdown`, subsequent calls to `SpanExport` are not allowed
53-
/// and should return an error.
48+
/// This function should be called only once for each `SpanExporter`
49+
/// instance. After the call to `shutdown`, subsequent calls to `export` are
50+
/// not allowed and should return an error.
5451
///
55-
/// Shutdown should not block indefinitely (e.g. if it attempts to flush the
56-
/// data and the destination is unavailable). SDK authors can
57-
/// decide if they want to make the shutdown timeout to be configurable.
52+
/// This function should not block indefinitely (e.g. if it attempts to
53+
/// flush the data and the destination is unavailable). SDK authors
54+
/// can decide if they want to make the shutdown timeout
55+
/// configurable.
5856
fn shutdown(&mut self) {}
5957
}
6058

6159
/// A minimal interface necessary for export spans over HTTP.
6260
///
63-
/// Users sometime choose http clients that relay on certain runtime. This trait allows users to bring
64-
/// their choice of http clients.
61+
/// Users sometime choose http clients that relay on certain runtime. This trait
62+
/// allows users to bring their choice of http clients.
6563
#[cfg(feature = "http")]
6664
#[cfg_attr(docsrs, doc(cfg(feature = "http")))]
6765
#[async_trait]

opentelemetry/src/exporter/trace/stdout.rs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,7 @@ use crate::{
3131
};
3232
use async_trait::async_trait;
3333
use std::fmt::Debug;
34-
use std::io::{self, stdout, Stdout, Write};
35-
use std::sync::Mutex;
34+
use std::io::{stdout, Stdout, Write};
3635

3736
/// Pipeline builder
3837
#[derive(Debug)]
@@ -108,15 +107,15 @@ where
108107
/// [`Stdout`]: std::io::Stdout
109108
#[derive(Debug)]
110109
pub struct Exporter<W: Write> {
111-
writer: Mutex<W>,
110+
writer: W,
112111
pretty_print: bool,
113112
}
114113

115114
impl<W: Write> Exporter<W> {
116115
/// Create a new stdout `Exporter`.
117116
pub fn new(writer: W, pretty_print: bool) -> Self {
118117
Self {
119-
writer: Mutex::new(writer),
118+
writer,
120119
pretty_print,
121120
}
122121
}
@@ -128,16 +127,12 @@ where
128127
W: Write + Debug + Send + 'static,
129128
{
130129
/// Export spans to stdout
131-
async fn export(&self, batch: Vec<SpanData>) -> ExportResult {
132-
let mut writer = self
133-
.writer
134-
.lock()
135-
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
130+
async fn export(&mut self, batch: Vec<SpanData>) -> ExportResult {
136131
for span in batch {
137132
if self.pretty_print {
138-
writer.write_all(format!("{:#?}\n", span).as_bytes())?;
133+
self.writer.write_all(format!("{:#?}\n", span).as_bytes())?;
139134
} else {
140-
writer.write_all(format!("{:?}\n", span).as_bytes())?;
135+
self.writer.write_all(format!("{:?}\n", span).as_bytes())?;
141136
}
142137
}
143138

opentelemetry/src/sdk/trace/span_processor.rs

Lines changed: 44 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,20 @@ const OTEL_BSP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE";
5858
/// Default maximum batch size
5959
const OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
6060

61-
/// `SpanProcessor`s allow hooks for span start and end method invocations.
61+
/// `SpanProcessor` is an interface which allows hooks for span start and end
62+
/// method invocations. The span processors are invoked only when is_recording
63+
/// is true.
6264
pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
63-
/// `on_start` method is invoked when a `Span` is started.
65+
/// `on_start` is called when a `Span` is started. This method is called
66+
/// synchronously on the thread that started the span, therefore it should
67+
/// not block or throw exceptions.
6468
fn on_start(&self, span: &Span, cx: &Context);
65-
/// `on_end` method is invoked when a `Span` is ended.
69+
/// `on_end` is called after a `Span` is ended (i.e., the end timestamp is
70+
/// already set). This method is called synchronously within the `Span::end`
71+
/// API, therefore it should not block or throw an exception.
6672
fn on_end(&self, span: SpanData);
67-
/// Shutdown is invoked when SDK shuts down. Use this call to cleanup any
68-
/// processor data. No calls to `on_start` and `on_end` method is invoked
69-
/// after `shutdown` call is made.
73+
/// Shuts down the processor. Called when SDK is shut down. This is an
74+
/// opportunity for processor to do any cleanup required.
7075
fn shutdown(&mut self);
7176
}
7277

@@ -95,12 +100,14 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
95100
/// [`SpanProcessor`]: ../../api/trace/span_processor/trait.SpanProcessor.html
96101
#[derive(Debug)]
97102
pub struct SimpleSpanProcessor {
98-
exporter: Box<dyn SpanExporter>,
103+
exporter: Mutex<Box<dyn SpanExporter>>,
99104
}
100105

101106
impl SimpleSpanProcessor {
102107
pub(crate) fn new(exporter: Box<dyn SpanExporter>) -> Self {
103-
SimpleSpanProcessor { exporter }
108+
SimpleSpanProcessor {
109+
exporter: Mutex::new(exporter),
110+
}
104111
}
105112
}
106113

@@ -110,12 +117,16 @@ impl SpanProcessor for SimpleSpanProcessor {
110117
}
111118

112119
fn on_end(&self, span: SpanData) {
113-
// TODO: Surface error through global error handler
114-
let _result = executor::block_on(self.exporter.export(vec![span]));
120+
if let Ok(mut exporter) = self.exporter.lock() {
121+
// TODO: Surface error through global error handler
122+
let _result = executor::block_on(exporter.export(vec![span]));
123+
}
115124
}
116125

117126
fn shutdown(&mut self) {
118-
self.exporter.shutdown();
127+
if let Ok(mut exporter) = self.exporter.lock() {
128+
exporter.shutdown();
129+
}
119130
}
120131
}
121132

@@ -433,14 +444,31 @@ where
433444

434445
#[cfg(test)]
435446
mod tests {
436-
use crate::exporter::trace::stdout;
437-
use crate::sdk::trace::span_processor::{
438-
OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
439-
OTEL_BSP_SCHEDULE_DELAY_MILLIS, OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT,
447+
use super::{
448+
BatchSpanProcessor, SimpleSpanProcessor, SpanProcessor, OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
449+
OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BSP_SCHEDULE_DELAY_MILLIS,
450+
OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT,
440451
};
441-
use crate::sdk::trace::BatchSpanProcessor;
452+
use crate::exporter::trace::stdout;
453+
use crate::testing::trace::{new_test_export_span_data, new_test_exporter};
442454
use std::time;
443455

456+
#[test]
457+
fn simple_span_processor_on_end_calls_export() {
458+
let (exporter, rx_export, _rx_shutdown) = new_test_exporter();
459+
let processor = SimpleSpanProcessor::new(Box::new(exporter));
460+
processor.on_end(new_test_export_span_data());
461+
assert!(rx_export.try_recv().is_ok());
462+
}
463+
464+
#[test]
465+
fn simple_span_processor_shutdown_calls_shutdown() {
466+
let (exporter, _rx_export, rx_shutdown) = new_test_exporter();
467+
let mut processor = SimpleSpanProcessor::new(Box::new(exporter));
468+
processor.shutdown();
469+
assert!(rx_shutdown.try_recv().is_ok());
470+
}
471+
444472
#[test]
445473
fn test_build_batch_span_processor_from_env() {
446474
std::env::set_var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE, "500");

0 commit comments

Comments
 (0)