Skip to content

Commit f98ef88

Browse files
committed
feat(jaeger): better configuration pipeline.
- Separate agent pipeline and collector pipeline. it's now `new_agent_pipeline` and `new_collector_pipeline` - Add `Configurable` trait to include common attributes shared by agent pipeline and collector pipeline. - Removed `with_tag` method. - Make build in http client additive. `surf_collector_client`, `isahc_collector_client`, etc. are now just allow user to choose the http client.
1 parent e09392d commit f98ef88

File tree

21 files changed

+1414
-920
lines changed

21 files changed

+1414
-920
lines changed

examples/actix-http/src/main.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@ use opentelemetry::{
88
trace::{FutureExt, TraceContextExt, Tracer},
99
Key,
1010
};
11+
use opentelemetry_jaeger::Configurable;
1112

1213
fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
13-
opentelemetry_jaeger::new_pipeline()
14-
.with_collector_endpoint("http://127.0.0.1:14268/api/traces")
14+
opentelemetry_jaeger::new_collector_pipeline()
15+
.with_endpoint("http://127.0.0.1:14268/api/traces")
1516
.with_service_name("trace-http-demo")
1617
.install_batch(opentelemetry::runtime::TokioCurrentThread)
1718
}

examples/actix-udp/src/main.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@ use opentelemetry::{
77
trace::{FutureExt, TraceContextExt, Tracer},
88
Key,
99
};
10+
use opentelemetry_jaeger::Configurable;
1011

1112
fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
12-
opentelemetry_jaeger::new_pipeline()
13-
.with_agent_endpoint("localhost:6831")
13+
opentelemetry_jaeger::new_agent_pipeline()
14+
.with_endpoint("localhost:6831")
1415
.with_service_name("trace-udp-demo")
1516
.with_trace_config(opentelemetry::sdk::trace::config().with_resource(
1617
opentelemetry::sdk::Resource::new(vec![

examples/async/src/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use opentelemetry::{
2424
trace::{FutureExt, TraceContextExt, Tracer},
2525
Context,
2626
};
27+
use opentelemetry_jaeger::Configurable;
2728
use std::{error::Error, io, net::SocketAddr};
2829
use tokio::io::AsyncWriteExt;
2930
use tokio::net::TcpStream;
@@ -54,7 +55,7 @@ async fn run(addr: &SocketAddr) -> io::Result<usize> {
5455
}
5556

5657
fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
57-
opentelemetry_jaeger::new_pipeline()
58+
opentelemetry_jaeger::new_agent_pipeline()
5859
.with_service_name("trace-demo")
5960
.install_batch(opentelemetry::runtime::Tokio)
6061
}

examples/basic/src/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@ use opentelemetry::{
1010
trace::{TraceContextExt, Tracer},
1111
Context, Key, KeyValue,
1212
};
13+
use opentelemetry_jaeger::Configurable;
1314
use std::error::Error;
1415
use std::time::Duration;
1516

1617
fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
17-
opentelemetry_jaeger::new_pipeline()
18+
opentelemetry_jaeger::new_agent_pipeline()
1819
.with_service_name("trace-demo")
1920
.with_trace_config(Config::default().with_resource(Resource::new(vec![
2021
KeyValue::new("service.name", "new_service"),

examples/grpc/src/client.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use opentelemetry::{
1010
trace::{TraceContextExt, Tracer as _},
1111
Context, KeyValue,
1212
};
13+
use opentelemetry_jaeger::Configurable;
1314

1415
struct MetadataMap<'a>(&'a mut tonic::metadata::MetadataMap);
1516

@@ -30,7 +31,7 @@ pub mod hello_world {
3031

3132
fn tracing_init() -> TraceResult<Tracer> {
3233
global::set_text_map_propagator(TraceContextPropagator::new());
33-
opentelemetry_jaeger::new_pipeline()
34+
opentelemetry_jaeger::new_agent_pipeline()
3435
.with_service_name("grpc-client")
3536
.install_simple()
3637
}

examples/grpc/src/server.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use opentelemetry::{
1010
trace::{Span, Tracer},
1111
KeyValue,
1212
};
13+
use opentelemetry_jaeger::Configurable;
1314
use std::error::Error;
1415

1516
pub mod hello_world {
@@ -61,7 +62,7 @@ impl Greeter for MyGreeter {
6162

6263
fn tracing_init() -> Result<impl Tracer, TraceError> {
6364
global::set_text_map_propagator(TraceContextPropagator::new());
64-
opentelemetry_jaeger::new_pipeline()
65+
opentelemetry_jaeger::new_agent_pipeline()
6566
.with_service_name("grpc-server")
6667
.install_simple()
6768
}

examples/multiple-span-processors/src/main.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,20 @@ use opentelemetry::sdk::trace::{BatchSpanProcessor, Config, TracerProvider};
44
use opentelemetry::sdk::Resource;
55
use opentelemetry::trace::{mark_span_as_active, TraceError, Tracer};
66
use opentelemetry::KeyValue;
7+
use opentelemetry_jaeger::Configurable;
78
use std::io::stdout;
89
use std::time::Duration;
910

1011
fn init_tracer() -> Result<(), TraceError> {
1112
// build a jaeger batch span processor
1213
let jaeger_processor = BatchSpanProcessor::builder(
13-
opentelemetry_jaeger::new_pipeline()
14+
opentelemetry_jaeger::new_agent_pipeline()
1415
.with_service_name("trace-demo")
1516
.with_trace_config(
1617
Config::default()
1718
.with_resource(Resource::new(vec![KeyValue::new("exporter", "jaeger")])),
1819
)
19-
.init_async_exporter(opentelemetry::runtime::Tokio)?,
20+
.build_async_agent_exporter(opentelemetry::runtime::Tokio)?,
2021
opentelemetry::runtime::Tokio,
2122
)
2223
.build();

opentelemetry-jaeger/README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ use opentelemetry::trace::Tracer;
4747

4848
fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
4949
global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
50-
let tracer = opentelemetry_jaeger::new_pipeline().install_simple()?;
50+
let tracer = opentelemetry_jaeger::new_agent_pipeline().install_simple()?;
5151

5252
tracer.in_span("doing_work", |cx| {
5353
// Traced app logic here...
@@ -76,7 +76,7 @@ opentelemetry-jaeger = { version = "*", features = ["rt-tokio"] }
7676
```
7777

7878
```rust
79-
let tracer = opentelemetry_jaeger::new_pipeline()
79+
let tracer = opentelemetry_jaeger::new_agent_pipeline()
8080
.install_batch(opentelemetry::runtime::Tokio)?;
8181
```
8282

@@ -120,11 +120,11 @@ Then you can use the [`with_collector_endpoint`] method to specify the endpoint:
120120
use opentelemetry::trace::Tracer;
121121

122122
fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
123-
let tracer = opentelemetry_jaeger::new_pipeline()
124-
.with_collector_endpoint("http://localhost:14268/api/traces")
123+
let tracer = opentelemetry_jaeger::new_collector_pipeline()
124+
.with_endpoint("http://localhost:14268/api/traces")
125125
// optionally set username and password as well.
126-
.with_collector_username("username")
127-
.with_collector_password("s3cr3t")
126+
.with_username("username")
127+
.with_password("s3cr3t")
128128
.install_batch()?;
129129

130130
tracer.in_span("doing_work", |cx| {

opentelemetry-jaeger/src/exporter/agent.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,6 @@ use thrift::{
1212
transport::{ReadHalf, TIoChannel, WriteHalf},
1313
};
1414

15-
/// The max size of UDP packet we want to send, synced with jaeger-agent
16-
const UDP_PACKET_MAX_LENGTH: usize = 65_000;
17-
1815
struct BufferClient {
1916
buffer: ReadHalf<TBufferChannel>,
2017
client: agent::AgentSyncClient<
@@ -47,10 +44,9 @@ impl AgentSyncClientUdp {
4744
/// Create a new UDP agent client
4845
pub(crate) fn new<T: ToSocketAddrs>(
4946
host_port: T,
50-
max_packet_size: Option<usize>,
47+
max_packet_size: usize,
5148
auto_split: bool,
5249
) -> thrift::Result<Self> {
53-
let max_packet_size = max_packet_size.unwrap_or(UDP_PACKET_MAX_LENGTH);
5450
let (buffer, write) = TBufferChannel::with_capacity(max_packet_size).split()?;
5551
let client = agent::AgentSyncClient::new(
5652
TCompactInputProtocol::new(TNoopChannel),
@@ -106,11 +102,10 @@ impl<R: JaegerTraceRuntime> AgentAsyncClientUdp<R> {
106102
/// Create a new UDP agent client
107103
pub(crate) fn new<T: ToSocketAddrs>(
108104
host_port: T,
109-
max_packet_size: Option<usize>,
105+
max_packet_size: usize,
110106
runtime: R,
111107
auto_split: bool,
112108
) -> thrift::Result<Self> {
113-
let max_packet_size = max_packet_size.unwrap_or(UDP_PACKET_MAX_LENGTH);
114109
let (buffer, write) = TBufferChannel::with_capacity(max_packet_size).split()?;
115110
let client = agent::AgentSyncClient::new(
116111
TCompactInputProtocol::new(TNoopChannel),

opentelemetry-jaeger/src/exporter/collector.rs

Lines changed: 33 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,14 @@
11
//! # HTTP Jaeger Collector Client
2+
//!
3+
#[cfg(feature = "collector_client")]
24
use http::Uri;
35
#[cfg(feature = "collector_client")]
46
use opentelemetry_http::{HttpClient, ResponseExt as _};
5-
use std::sync::atomic::AtomicUsize;
6-
7-
/// `CollectorAsyncClientHttp` implements an async version of the
8-
/// `TCollectorSyncClient` interface over HTTP
9-
#[derive(Debug)]
10-
pub(crate) struct CollectorAsyncClientHttp {
11-
endpoint: Uri,
12-
#[cfg(feature = "collector_client")]
13-
client: Box<dyn HttpClient>,
14-
#[cfg(all(feature = "wasm_collector_client", not(feature = "collector_client")))]
15-
client: WasmHttpClient,
16-
payload_size_estimate: AtomicUsize,
17-
}
187

8+
#[cfg(feature = "collector_client")]
9+
pub(crate) use collector_client::AsyncHttpClient;
1910
#[cfg(feature = "wasm_collector_client")]
20-
#[derive(Debug)]
21-
struct WasmHttpClient {
22-
_auth: Option<String>,
23-
}
11+
pub(crate) use wasm_collector_client::WasmCollector;
2412

2513
#[cfg(feature = "collector_client")]
2614
mod collector_client {
@@ -31,14 +19,23 @@ mod collector_client {
3119
use std::sync::atomic::{AtomicUsize, Ordering};
3220
use thrift::protocol::TBinaryOutputProtocol;
3321

34-
impl CollectorAsyncClientHttp {
22+
/// `AsyncHttpClient` implements an async version of the
23+
/// `TCollectorSyncClient` interface over HTTP
24+
#[derive(Debug)]
25+
pub(crate) struct AsyncHttpClient {
26+
endpoint: Uri,
27+
http_client: Box<dyn HttpClient>,
28+
payload_size_estimate: AtomicUsize,
29+
}
30+
31+
impl AsyncHttpClient {
3532
/// Create a new HTTP collector client
3633
pub(crate) fn new(endpoint: Uri, client: Box<dyn HttpClient>) -> Self {
3734
let payload_size_estimate = AtomicUsize::new(512);
3835

39-
CollectorAsyncClientHttp {
36+
AsyncHttpClient {
4037
endpoint,
41-
client,
38+
http_client: client,
4239
payload_size_estimate,
4340
}
4441
}
@@ -68,15 +65,14 @@ mod collector_client {
6865
.expect("request should always be valid");
6966

7067
// Send request to collector
71-
let _ = self.client.send(req).await?.error_for_status()?;
68+
let _ = self.http_client.send(req).await?.error_for_status()?;
7269
Ok(())
7370
}
7471
}
7572
}
7673

77-
#[cfg(all(feature = "wasm_collector_client", not(feature = "collector_client")))]
74+
#[cfg(feature = "wasm_collector_client")]
7875
mod wasm_collector_client {
79-
use super::*;
8076
use crate::exporter::thrift::jaeger;
8177
use futures_util::future;
8278
use http::Uri;
@@ -91,7 +87,19 @@ mod wasm_collector_client {
9187
use wasm_bindgen_futures::JsFuture;
9288
use web_sys::{Request, RequestCredentials, RequestInit, RequestMode, Response};
9389

94-
impl CollectorAsyncClientHttp {
90+
#[derive(Debug)]
91+
pub(crate) struct WasmCollector {
92+
endpoint: Uri,
93+
payload_size_estimate: AtomicUsize,
94+
client: WasmHttpClient,
95+
}
96+
97+
#[derive(Debug, Default)]
98+
struct WasmHttpClient {
99+
auth: Option<String>,
100+
}
101+
102+
impl WasmCollector {
95103
/// Create a new HTTP collector client
96104
pub(crate) fn new(
97105
endpoint: Uri,
@@ -111,7 +119,7 @@ mod wasm_collector_client {
111119

112120
Ok(Self {
113121
endpoint,
114-
client: WasmHttpClient { _auth: auth },
122+
client: WasmHttpClient { auth },
115123
payload_size_estimate,
116124
})
117125
}

0 commit comments

Comments
 (0)