Skip to content

Commit 0704793

Browse files
authored
feat(jaeger): better configuration pipeline. (#748)
* feat(jaeger): better configuration pipeline. - Separate agent pipeline and collector pipeline. it's now `new_agent_pipeline` and `new_collector_pipeline` - Add `Configurable` trait to include common attributes shared by agent pipeline and collector pipeline. - Removed `with_tag` method. - Make build in http client additive. `surf_collector_client`, `isahc_collector_client`, etc. are now just allow user to choose the http client. * fix(jaeger): Move CommonConfig and HasRequiredConfig to private mod to meet MSRV requirement. Rename CommonConfig to TransformationConfig. * chore: make format happy. * chore: make msrv happy. * test: add unit tests. * refactor(jaeger): removed the `Configurable` trait * fix(jaeger): fix code link
1 parent 7c4545a commit 0704793

File tree

21 files changed

+1424
-919
lines changed

21 files changed

+1424
-919
lines changed

examples/actix-http/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ use opentelemetry::{
1010
};
1111

1212
fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
13-
opentelemetry_jaeger::new_pipeline()
14-
.with_collector_endpoint("http://127.0.0.1:14268/api/traces")
13+
opentelemetry_jaeger::new_collector_pipeline()
14+
.with_endpoint("http://127.0.0.1:14268/api/traces")
1515
.with_service_name("trace-http-demo")
1616
.install_batch(opentelemetry::runtime::TokioCurrentThread)
1717
}

examples/actix-udp/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ use opentelemetry::{
99
};
1010

1111
fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
12-
opentelemetry_jaeger::new_pipeline()
13-
.with_agent_endpoint("localhost:6831")
12+
opentelemetry_jaeger::new_agent_pipeline()
13+
.with_endpoint("localhost:6831")
1414
.with_service_name("trace-udp-demo")
1515
.with_trace_config(opentelemetry::sdk::trace::config().with_resource(
1616
opentelemetry::sdk::Resource::new(vec![

examples/async/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ async fn run(addr: &SocketAddr) -> io::Result<usize> {
5454
}
5555

5656
fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
57-
opentelemetry_jaeger::new_pipeline()
57+
opentelemetry_jaeger::new_agent_pipeline()
5858
.with_service_name("trace-demo")
5959
.install_batch(opentelemetry::runtime::Tokio)
6060
}

examples/basic/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use std::error::Error;
1414
use std::time::Duration;
1515

1616
fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
17-
opentelemetry_jaeger::new_pipeline()
17+
opentelemetry_jaeger::new_agent_pipeline()
1818
.with_service_name("trace-demo")
1919
.with_trace_config(Config::default().with_resource(Resource::new(vec![
2020
KeyValue::new("service.name", "new_service"),

examples/grpc/src/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ pub mod hello_world {
3030

3131
fn tracing_init() -> TraceResult<Tracer> {
3232
global::set_text_map_propagator(TraceContextPropagator::new());
33-
opentelemetry_jaeger::new_pipeline()
33+
opentelemetry_jaeger::new_agent_pipeline()
3434
.with_service_name("grpc-client")
3535
.install_simple()
3636
}

examples/grpc/src/server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ impl Greeter for MyGreeter {
6161

6262
fn tracing_init() -> Result<impl Tracer, TraceError> {
6363
global::set_text_map_propagator(TraceContextPropagator::new());
64-
opentelemetry_jaeger::new_pipeline()
64+
opentelemetry_jaeger::new_agent_pipeline()
6565
.with_service_name("grpc-server")
6666
.install_simple()
6767
}

examples/multiple-span-processors/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@ use std::time::Duration;
1010
fn init_tracer() -> Result<(), TraceError> {
1111
// build a jaeger batch span processor
1212
let jaeger_processor = BatchSpanProcessor::builder(
13-
opentelemetry_jaeger::new_pipeline()
13+
opentelemetry_jaeger::new_agent_pipeline()
1414
.with_service_name("trace-demo")
1515
.with_trace_config(
1616
Config::default()
1717
.with_resource(Resource::new(vec![KeyValue::new("exporter", "jaeger")])),
1818
)
19-
.init_async_exporter(opentelemetry::runtime::Tokio)?,
19+
.build_async_agent_exporter(opentelemetry::runtime::Tokio)?,
2020
opentelemetry::runtime::Tokio,
2121
)
2222
.build();

opentelemetry-jaeger/README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ use opentelemetry::trace::Tracer;
4747

4848
fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
4949
global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
50-
let tracer = opentelemetry_jaeger::new_pipeline().install_simple()?;
50+
let tracer = opentelemetry_jaeger::new_agent_pipeline().install_simple()?;
5151

5252
tracer.in_span("doing_work", |cx| {
5353
// Traced app logic here...
@@ -76,7 +76,7 @@ opentelemetry-jaeger = { version = "*", features = ["rt-tokio"] }
7676
```
7777

7878
```rust
79-
let tracer = opentelemetry_jaeger::new_pipeline()
79+
let tracer = opentelemetry_jaeger::new_agent_pipeline()
8080
.install_batch(opentelemetry::runtime::Tokio)?;
8181
```
8282

@@ -120,11 +120,11 @@ Then you can use the [`with_collector_endpoint`] method to specify the endpoint:
120120
use opentelemetry::trace::Tracer;
121121

122122
fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
123-
let tracer = opentelemetry_jaeger::new_pipeline()
124-
.with_collector_endpoint("http://localhost:14268/api/traces")
123+
let tracer = opentelemetry_jaeger::new_collector_pipeline()
124+
.with_endpoint("http://localhost:14268/api/traces")
125125
// optionally set username and password as well.
126-
.with_collector_username("username")
127-
.with_collector_password("s3cr3t")
126+
.with_username("username")
127+
.with_password("s3cr3t")
128128
.install_batch()?;
129129

130130
tracer.in_span("doing_work", |cx| {

opentelemetry-jaeger/src/exporter/agent.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,6 @@ use thrift::{
1212
transport::{ReadHalf, TIoChannel, WriteHalf},
1313
};
1414

15-
/// The max size of UDP packet we want to send, synced with jaeger-agent
16-
const UDP_PACKET_MAX_LENGTH: usize = 65_000;
17-
1815
struct BufferClient {
1916
buffer: ReadHalf<TBufferChannel>,
2017
client: agent::AgentSyncClient<
@@ -47,10 +44,9 @@ impl AgentSyncClientUdp {
4744
/// Create a new UDP agent client
4845
pub(crate) fn new<T: ToSocketAddrs>(
4946
host_port: T,
50-
max_packet_size: Option<usize>,
47+
max_packet_size: usize,
5148
auto_split: bool,
5249
) -> thrift::Result<Self> {
53-
let max_packet_size = max_packet_size.unwrap_or(UDP_PACKET_MAX_LENGTH);
5450
let (buffer, write) = TBufferChannel::with_capacity(max_packet_size).split()?;
5551
let client = agent::AgentSyncClient::new(
5652
TCompactInputProtocol::new(TNoopChannel),
@@ -106,11 +102,10 @@ impl<R: JaegerTraceRuntime> AgentAsyncClientUdp<R> {
106102
/// Create a new UDP agent client
107103
pub(crate) fn new<T: ToSocketAddrs>(
108104
host_port: T,
109-
max_packet_size: Option<usize>,
105+
max_packet_size: usize,
110106
runtime: R,
111107
auto_split: bool,
112108
) -> thrift::Result<Self> {
113-
let max_packet_size = max_packet_size.unwrap_or(UDP_PACKET_MAX_LENGTH);
114109
let (buffer, write) = TBufferChannel::with_capacity(max_packet_size).split()?;
115110
let client = agent::AgentSyncClient::new(
116111
TCompactInputProtocol::new(TNoopChannel),

opentelemetry-jaeger/src/exporter/collector.rs

Lines changed: 33 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,14 @@
11
//! # HTTP Jaeger Collector Client
2+
//!
3+
#[cfg(feature = "collector_client")]
24
use http::Uri;
35
#[cfg(feature = "collector_client")]
46
use opentelemetry_http::{HttpClient, ResponseExt as _};
5-
use std::sync::atomic::AtomicUsize;
6-
7-
/// `CollectorAsyncClientHttp` implements an async version of the
8-
/// `TCollectorSyncClient` interface over HTTP
9-
#[derive(Debug)]
10-
pub(crate) struct CollectorAsyncClientHttp {
11-
endpoint: Uri,
12-
#[cfg(feature = "collector_client")]
13-
client: Box<dyn HttpClient>,
14-
#[cfg(all(feature = "wasm_collector_client", not(feature = "collector_client")))]
15-
client: WasmHttpClient,
16-
payload_size_estimate: AtomicUsize,
17-
}
187

8+
#[cfg(feature = "collector_client")]
9+
pub(crate) use collector_client::AsyncHttpClient;
1910
#[cfg(feature = "wasm_collector_client")]
20-
#[derive(Debug)]
21-
struct WasmHttpClient {
22-
_auth: Option<String>,
23-
}
11+
pub(crate) use wasm_collector_client::WasmCollector;
2412

2513
#[cfg(feature = "collector_client")]
2614
mod collector_client {
@@ -31,14 +19,23 @@ mod collector_client {
3119
use std::sync::atomic::{AtomicUsize, Ordering};
3220
use thrift::protocol::TBinaryOutputProtocol;
3321

34-
impl CollectorAsyncClientHttp {
22+
/// `AsyncHttpClient` implements an async version of the
23+
/// `TCollectorSyncClient` interface over HTTP
24+
#[derive(Debug)]
25+
pub(crate) struct AsyncHttpClient {
26+
endpoint: Uri,
27+
http_client: Box<dyn HttpClient>,
28+
payload_size_estimate: AtomicUsize,
29+
}
30+
31+
impl AsyncHttpClient {
3532
/// Create a new HTTP collector client
3633
pub(crate) fn new(endpoint: Uri, client: Box<dyn HttpClient>) -> Self {
3734
let payload_size_estimate = AtomicUsize::new(512);
3835

39-
CollectorAsyncClientHttp {
36+
AsyncHttpClient {
4037
endpoint,
41-
client,
38+
http_client: client,
4239
payload_size_estimate,
4340
}
4441
}
@@ -68,15 +65,14 @@ mod collector_client {
6865
.expect("request should always be valid");
6966

7067
// Send request to collector
71-
let _ = self.client.send(req).await?.error_for_status()?;
68+
let _ = self.http_client.send(req).await?.error_for_status()?;
7269
Ok(())
7370
}
7471
}
7572
}
7673

77-
#[cfg(all(feature = "wasm_collector_client", not(feature = "collector_client")))]
74+
#[cfg(feature = "wasm_collector_client")]
7875
mod wasm_collector_client {
79-
use super::*;
8076
use crate::exporter::thrift::jaeger;
8177
use futures_util::future;
8278
use http::Uri;
@@ -91,7 +87,19 @@ mod wasm_collector_client {
9187
use wasm_bindgen_futures::JsFuture;
9288
use web_sys::{Request, RequestCredentials, RequestInit, RequestMode, Response};
9389

94-
impl CollectorAsyncClientHttp {
90+
#[derive(Debug)]
91+
pub(crate) struct WasmCollector {
92+
endpoint: Uri,
93+
payload_size_estimate: AtomicUsize,
94+
client: WasmHttpClient,
95+
}
96+
97+
#[derive(Debug, Default)]
98+
struct WasmHttpClient {
99+
auth: Option<String>,
100+
}
101+
102+
impl WasmCollector {
95103
/// Create a new HTTP collector client
96104
pub(crate) fn new(
97105
endpoint: Uri,
@@ -111,7 +119,7 @@ mod wasm_collector_client {
111119

112120
Ok(Self {
113121
endpoint,
114-
client: WasmHttpClient { _auth: auth },
122+
client: WasmHttpClient { auth },
115123
payload_size_estimate,
116124
})
117125
}

0 commit comments

Comments
 (0)