Skip to content

Commit 9ded162

Browse files
authored
Reuse connections (#1249)
Refactor trace-utils to pass an http client so the connections are reused. Refactor data-pipeline to hold a client pool so the connection with the agent is reused. Refactor trace_flusher so it stores an http client. Remove Option passing client to the Stats Exporter. Merge branch 'main' into julio/reuse-connections Co-authored-by: julio.gonzalez <julio.gonzalez@datadoghq.com>
1 parent b9c68ca commit 9ded162

File tree

11 files changed

+95
-111
lines changed

11 files changed

+95
-111
lines changed

Cargo.lock

Lines changed: 0 additions & 21 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

data-pipeline/src/stats_exporter.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::trace_exporter::TracerMetadata;
1515
use datadog_trace_protobuf::pb;
1616
use datadog_trace_stats::span_concentrator::SpanConcentrator;
1717
use datadog_trace_utils::send_with_retry::{send_with_retry, RetryStrategy};
18-
use ddcommon::{worker::Worker, Endpoint};
18+
use ddcommon::{worker::Worker, Endpoint, HttpClient};
1919
use hyper;
2020
use tokio::select;
2121
use tokio_util::sync::CancellationToken;
@@ -32,6 +32,7 @@ pub struct StatsExporter {
3232
meta: TracerMetadata,
3333
sequence_id: AtomicU64,
3434
cancellation_token: CancellationToken,
35+
client: HttpClient,
3536
}
3637

3738
impl StatsExporter {
@@ -49,6 +50,7 @@ impl StatsExporter {
4950
meta: TracerMetadata,
5051
endpoint: Endpoint,
5152
cancellation_token: CancellationToken,
53+
client: HttpClient,
5254
) -> Self {
5355
Self {
5456
flush_interval,
@@ -57,6 +59,7 @@ impl StatsExporter {
5759
meta,
5860
sequence_id: AtomicU64::new(0),
5961
cancellation_token,
62+
client,
6063
}
6164
}
6265

@@ -90,11 +93,11 @@ impl StatsExporter {
9093
);
9194

9295
let result = send_with_retry(
96+
&self.client,
9397
&self.endpoint,
9498
body,
9599
&headers,
96100
&RetryStrategy::default(),
97-
None,
98101
)
99102
.await;
100103

@@ -191,6 +194,7 @@ mod tests {
191194
use super::*;
192195
use datadog_trace_utils::span::{trace_utils, SpanSlice};
193196
use datadog_trace_utils::test_utils::poll_for_mock_hit;
197+
use ddcommon::hyper_migration::new_default_client;
194198
use httpmock::prelude::*;
195199
use httpmock::MockServer;
196200
use time::Duration;
@@ -267,6 +271,7 @@ mod tests {
267271
get_test_metadata(),
268272
Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
269273
CancellationToken::new(),
274+
new_default_client(),
270275
);
271276

272277
let send_status = stats_exporter.send(true).await;
@@ -294,6 +299,7 @@ mod tests {
294299
get_test_metadata(),
295300
Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
296301
CancellationToken::new(),
302+
new_default_client(),
297303
);
298304

299305
let send_status = stats_exporter.send(true).await;
@@ -326,6 +332,7 @@ mod tests {
326332
get_test_metadata(),
327333
Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
328334
CancellationToken::new(),
335+
new_default_client(),
329336
);
330337

331338
tokio::time::pause();
@@ -366,6 +373,7 @@ mod tests {
366373
get_test_metadata(),
367374
Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
368375
cancellation_token.clone(),
376+
new_default_client(),
369377
);
370378

371379
tokio::spawn(async move {

data-pipeline/src/trace_exporter/builder.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::trace_exporter::{
1212
INFO_ENDPOINT,
1313
};
1414
use arc_swap::ArcSwap;
15+
use ddcommon::hyper_migration::new_default_client;
1516
use ddcommon::{parse_uri, tag, Endpoint};
1617
use dogstatsd_client::new;
1718
use std::sync::{Arc, Mutex};
@@ -339,6 +340,7 @@ impl TraceExporterBuilder {
339340
agent_payload_response_version: self
340341
.agent_rates_payload_version_enabled
341342
.then(AgentResponsePayloadVersion::new),
343+
http_client: new_default_client(),
342344
})
343345
}
344346

data-pipeline/src/trace_exporter/mod.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ use datadog_trace_utils::send_with_retry::{
3636
};
3737
use datadog_trace_utils::span::{Span, SpanText};
3838
use datadog_trace_utils::trace_utils::TracerHeaderTags;
39-
use ddcommon::MutexExt;
4039
use ddcommon::{hyper_migration, Endpoint};
4140
use ddcommon::{tag, tag::Tag};
41+
use ddcommon::{HttpClient, MutexExt};
4242
use ddtelemetry::worker::TelemetryWorker;
4343
use dogstatsd_client::Client;
4444
use http_body_util::BodyExt;
@@ -208,6 +208,7 @@ pub struct TraceExporter {
208208
health_metrics_enabled: bool,
209209
workers: Arc<Mutex<TraceExporterWorkers>>,
210210
agent_payload_response_version: Option<AgentResponsePayloadVersion>,
211+
http_client: HttpClient,
211212
}
212213

213214
impl TraceExporter {
@@ -424,6 +425,7 @@ impl TraceExporter {
424425
&agent_info,
425426
&self.client_side_stats,
426427
&self.workers,
428+
self.http_client.clone(),
427429
);
428430
}
429431
StatsComputationStatus::Enabled {
@@ -627,7 +629,8 @@ impl TraceExporter {
627629
let payload_len = mp_payload.len();
628630

629631
// Send traces to the agent
630-
let result = send_with_retry(endpoint, mp_payload, &headers, &strategy, None).await;
632+
let result =
633+
send_with_retry(&self.http_client, endpoint, mp_payload, &headers, &strategy).await;
631634

632635
// Emit http.requests health metric based on number of attempts
633636
let requests_count = match &result {

data-pipeline/src/trace_exporter/stats.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::agent_info::schema::AgentInfo;
1111
use crate::stats_exporter;
1212
use arc_swap::ArcSwap;
1313
use datadog_trace_stats::span_concentrator::SpanConcentrator;
14-
use ddcommon::{Endpoint, MutexExt};
14+
use ddcommon::{Endpoint, HttpClient, MutexExt};
1515
use std::sync::{Arc, Mutex};
1616
use std::time::Duration;
1717
use tokio::runtime::Runtime;
@@ -64,6 +64,7 @@ pub(crate) fn start_stats_computation(
6464
workers: &Arc<Mutex<super::TraceExporterWorkers>>,
6565
span_kinds: Vec<String>,
6666
peer_tags: Vec<String>,
67+
client: HttpClient,
6768
) -> anyhow::Result<()> {
6869
if let StatsComputationStatus::DisabledByAgent { bucket_size } = **client_side_stats.load() {
6970
let stats_concentrator = Arc::new(Mutex::new(SpanConcentrator::new(
@@ -80,6 +81,7 @@ pub(crate) fn start_stats_computation(
8081
&cancellation_token,
8182
workers,
8283
client_side_stats,
84+
client,
8385
)?;
8486
}
8587
Ok(())
@@ -93,13 +95,15 @@ fn create_and_start_stats_worker(
9395
cancellation_token: &CancellationToken,
9496
workers: &Arc<Mutex<super::TraceExporterWorkers>>,
9597
client_side_stats: &ArcSwap<StatsComputationStatus>,
98+
client: HttpClient,
9699
) -> anyhow::Result<()> {
97100
let stats_exporter = stats_exporter::StatsExporter::new(
98101
bucket_size,
99102
stats_concentrator.clone(),
100103
ctx.metadata.clone(),
101104
Endpoint::from_url(add_path(ctx.endpoint_url, STATS_ENDPOINT)),
102105
cancellation_token.clone(),
106+
client,
103107
);
104108
let mut stats_worker = crate::pausable_worker::PausableWorker::new(stats_exporter);
105109

@@ -160,6 +164,7 @@ pub(crate) fn handle_stats_disabled_by_agent(
160164
agent_info: &Arc<AgentInfo>,
161165
client_side_stats: &ArcSwap<StatsComputationStatus>,
162166
workers: &Arc<Mutex<super::TraceExporterWorkers>>,
167+
client: HttpClient,
163168
) {
164169
if agent_info.info.client_drop_p0s.is_some_and(|v| v) {
165170
// Client-side stats is supported by the agent
@@ -169,6 +174,7 @@ pub(crate) fn handle_stats_disabled_by_agent(
169174
workers,
170175
get_span_kinds_for_stats(agent_info),
171176
agent_info.info.peer_tags.clone().unwrap_or_default(),
177+
client,
172178
);
173179
match status {
174180
Ok(()) => info!("Client-side stats enabled"),

datadog-sidecar/src/service/tracing/trace_flusher.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ use datadog_ipc::platform::NamedShmHandle;
77
use datadog_trace_utils::trace_utils;
88
use datadog_trace_utils::trace_utils::SendData;
99
use datadog_trace_utils::trace_utils::SendDataResult;
10-
use ddcommon::{Endpoint, MutexExt};
10+
use ddcommon::hyper_migration::new_default_client;
11+
use ddcommon::{Endpoint, HttpClient, MutexExt};
1112
use futures::future::join_all;
1213
use http_body_util::BodyExt;
1314
use manual_future::{ManualFuture, ManualFutureCompleter};
@@ -95,6 +96,7 @@ pub(crate) struct TraceFlusher {
9596
pub(crate) min_force_drop_size_bytes: AtomicU32, // put a limit on memory usage
9697
remote_config: Mutex<AgentRemoteConfigs>,
9798
pub metrics: Mutex<TraceFlusherMetrics>,
99+
client: HttpClient,
98100
}
99101
impl Default for TraceFlusher {
100102
fn default() -> Self {
@@ -105,6 +107,7 @@ impl Default for TraceFlusher {
105107
min_force_drop_size_bytes: AtomicU32::new(trace_utils::MAX_PAYLOAD_SIZE as u32),
106108
remote_config: Mutex::new(Default::default()),
107109
metrics: Mutex::new(Default::default()),
110+
client: new_default_client(),
108111
}
109112
}
110113
}
@@ -246,7 +249,7 @@ impl TraceFlusher {
246249

247250
async fn send_and_handle_trace(&self, send_data: SendData) {
248251
let endpoint = send_data.get_target().clone();
249-
let response = send_data.send().await;
252+
let response = send_data.send(&self.client).await;
250253
self.metrics.lock_or_panic().update(&response);
251254
match response.last_result {
252255
Ok(response) => {

datadog-trace-utils/Cargo.toml

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,6 @@ tinybytes = { path = "../tinybytes", features = [
3838
"serialization",
3939
] }
4040

41-
# Proxy feature
42-
hyper-http-proxy = { version = "1.1.0", default-features = false, features = [
43-
"rustls-tls-webpki-roots",
44-
], optional = true }
45-
4641
# Compression feature
4742
flate2 = { version = "1.0", optional = true }
4843
zstd = { version = "0.13.3", default-features = false, optional = true }
@@ -66,15 +61,14 @@ tempfile = "3.3.0"
6661
[features]
6762
default = ["https"]
6863
https = ["ddcommon/https"]
69-
mini_agent = ["proxy", "compression", "ddcommon/use_webpki_roots"]
64+
mini_agent = ["compression", "ddcommon/use_webpki_roots"]
7065
test-utils = [
7166
"hyper/server",
7267
"httpmock",
7368
"cargo_metadata",
7469
"cargo-platform",
7570
"urlencoding",
7671
]
77-
proxy = ["hyper-http-proxy"]
7872
compression = ["zstd", "flate2"]
7973
# FIPS mode uses the FIPS-compliant cryptographic provider (Unix only)
8074
fips = ["ddcommon/fips"]

0 commit comments

Comments
 (0)