Skip to content

Commit 8c62f6a

Browse files
committed
Collect agent info from sidecar
Signed-off-by: Bob Weinand <bob.weinand@datadoghq.com>
1 parent 22c2da5 commit 8c62f6a

File tree

10 files changed

+236
-6
lines changed

10 files changed

+236
-6
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

data-pipeline/src/agent_info/fetcher.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@ use tokio::time::sleep;
1717
#[allow(clippy::declare_interior_mutable_const)]
1818
const DATADOG_AGENT_STATE: HeaderName = HeaderName::from_static("datadog-agent-state");
1919

20+
/// Whether the agent reported the same value or not.
2021
#[derive(Debug)]
21-
enum FetchInfoStatus {
22+
pub enum FetchInfoStatus {
23+
/// Unchanged
2224
SameState,
25+
/// Has a new state
2326
NewState(Box<AgentInfo>),
2427
}
2528

@@ -28,7 +31,7 @@ enum FetchInfoStatus {
2831
/// If the state hash is different from the current one:
2932
/// - Return a `FetchInfoStatus::NewState` of the info struct
3033
/// - Else return `FetchInfoStatus::SameState`
31-
async fn fetch_info_with_state(
34+
pub async fn fetch_info_with_state(
3235
info_endpoint: &Endpoint,
3336
current_state_hash: Option<&str>,
3437
) -> Result<FetchInfoStatus> {

data-pipeline/src/agent_info/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,4 @@ mod fetcher;
1414
/// Stores an AgentInfo in an ArcSwap to be updated by an AgentInfoFetcher
1515
pub type AgentInfoArc = Arc<ArcSwapOption<schema::AgentInfo>>;
1616

17-
pub use fetcher::{fetch_info, AgentInfoFetcher};
17+
pub use fetcher::{fetch_info, fetch_info_with_state, AgentInfoFetcher, FetchInfoStatus};

data-pipeline/src/agent_info/schema.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
22
// SPDX-License-Identifier: Apache-2.0
33
//! This module provides struct representing the info endpoint response
4-
use serde::Deserialize;
4+
use serde::{Deserialize, Serialize};
55
use std::collections::HashMap;
66

77
/// Wrapper for an agent info response storing the state hash from the agent
@@ -15,7 +15,7 @@ pub struct AgentInfo {
1515

1616
/// Schema of an agent info response
1717
#[allow(missing_docs)]
18-
#[derive(Clone, Deserialize, Default, Debug, PartialEq)]
18+
#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq)]
1919
pub struct AgentInfoStruct {
2020
/// Version of the agent
2121
pub version: Option<String>,
@@ -38,7 +38,7 @@ pub struct AgentInfoStruct {
3838
}
3939

4040
#[allow(missing_docs)]
41-
#[derive(Clone, Deserialize, Default, Debug, PartialEq)]
41+
#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq)]
4242
pub struct Config {
4343
pub default_env: Option<String>,
4444
pub target_tps: Option<f64>,

sidecar-ffi/src/lib.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use datadog_sidecar::config;
1414
use datadog_sidecar::config::LogMethod;
1515
use datadog_sidecar::crashtracker::crashtracker_unix_socket_path;
1616
use datadog_sidecar::one_way_shared_memory::{OneWayShmReader, ReaderOpener};
17+
use datadog_sidecar::service::agent_info::AgentInfoReader;
1718
use datadog_sidecar::service::{
1819
blocking::{self, SidecarTransport},
1920
InstanceId, QueueId, RuntimeMetadata, SerializedTracerHeaderTags, SessionConfig, SidecarAction,
@@ -910,3 +911,35 @@ pub unsafe extern "C" fn ddog_sidecar_get_crashtracker_unix_socket_path() -> ffi
910911
buf.copy_from_slice(str.as_bytes());
911912
ffi::CharSlice::from_raw_parts(malloced as *mut c_char, size)
912913
}
914+
915+
/// Gets an agent info reader.
916+
#[no_mangle]
917+
#[allow(clippy::missing_safety_doc)]
918+
pub unsafe extern "C" fn ddog_get_agent_info_reader(endpoint: &Endpoint) -> Box<AgentInfoReader> {
919+
Box::new(AgentInfoReader::new(endpoint))
920+
}
921+
922+
/// Gets the current agent info environment (or empty if not existing)
923+
#[no_mangle]
924+
#[allow(clippy::missing_safety_doc)]
925+
pub unsafe extern "C" fn ddog_get_agent_info_env<'a>(
926+
reader: &'a mut AgentInfoReader,
927+
changed: &mut bool,
928+
) -> ffi::CharSlice<'a> {
929+
let (has_changed, info) = reader.read();
930+
*changed = has_changed;
931+
let config = if let Some(info) = info {
932+
info.config.as_ref()
933+
} else {
934+
None
935+
};
936+
config
937+
.and_then(|c| c.default_env.as_ref())
938+
.map(|s| ffi::CharSlice::from(s.as_str()))
939+
.unwrap_or(ffi::CharSlice::empty())
940+
}
941+
942+
/// Drops the agent info reader.
943+
#[no_mangle]
944+
#[allow(clippy::missing_safety_doc)]
945+
pub unsafe extern "C" fn ddog_drop_agent_info_reader(_: Box<AgentInfoReader>) {}

sidecar/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ ddcommon = { path = "../ddcommon" }
2121
datadog-sidecar-macros = { path = "macros" }
2222

2323
ddtelemetry = { path = "../ddtelemetry", features = ["tracing"] }
24+
data-pipeline = { path = "../data-pipeline" }
2425
datadog-trace-protobuf = { path = "../trace-protobuf" }
2526
datadog-trace-utils = { path = "../trace-utils" }
2627
datadog-trace-normalization = { path = "../trace-normalization" }

sidecar/src/service/agent_info.rs

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
2+
// SPDX-License-Identifier: Apache-2.0
3+
use crate::one_way_shared_memory::{open_named_shm, OneWayShmReader, OneWayShmWriter};
4+
use crate::primary_sidecar_identifier;
5+
use base64::prelude::BASE64_URL_SAFE_NO_PAD;
6+
use base64::Engine;
7+
use data_pipeline::agent_info::schema::AgentInfoStruct;
8+
use data_pipeline::agent_info::{fetch_info_with_state, FetchInfoStatus};
9+
use datadog_ipc::platform::NamedShmHandle;
10+
use ddcommon::Endpoint;
11+
use futures::future::Shared;
12+
use futures::FutureExt;
13+
use http::uri::PathAndQuery;
14+
use manual_future::ManualFuture;
15+
use std::ffi::CString;
16+
use std::hash::{Hash, Hasher};
17+
use std::sync::{Arc, Mutex};
18+
use std::time::{Duration, Instant};
19+
use tokio::time::sleep;
20+
use tracing::{error, warn};
21+
use zwohash::{HashMap, ZwoHasher};
22+
23+
#[derive(Default, Clone)]
24+
pub struct AgentInfos(Arc<Mutex<HashMap<Endpoint, AgentInfoFetcher>>>);
25+
26+
impl AgentInfos {
27+
pub fn query_for(&self, endpoint: Endpoint) -> AgentInfoGuard {
28+
let mut infos_guard = self.0.lock().unwrap();
29+
if let Some(info) = infos_guard.get_mut(&endpoint) {
30+
info.rc += 1;
31+
} else {
32+
infos_guard.insert(
33+
endpoint.clone(),
34+
AgentInfoFetcher::new(self.clone(), endpoint.clone()),
35+
);
36+
}
37+
38+
AgentInfoGuard {
39+
infos: self.clone(),
40+
endpoint,
41+
}
42+
}
43+
}
44+
45+
pub struct AgentInfoGuard {
46+
infos: AgentInfos,
47+
endpoint: Endpoint,
48+
}
49+
50+
impl AgentInfoGuard {
51+
pub fn get(&self) -> Shared<ManualFuture<AgentInfoStruct>> {
52+
let infos_guard = self.infos.0.lock().unwrap();
53+
let infos = infos_guard.get(&self.endpoint).unwrap();
54+
infos.infos.clone()
55+
}
56+
}
57+
58+
impl Drop for AgentInfoGuard {
59+
fn drop(&mut self) {
60+
let mut infos_guard = self.infos.0.lock().unwrap();
61+
let info = infos_guard.get_mut(&self.endpoint).unwrap();
62+
info.last_update = Instant::now();
63+
info.rc -= 1;
64+
}
65+
}
66+
67+
pub struct AgentInfoFetcher {
68+
last_update: Instant,
69+
rc: u32,
70+
infos: Shared<ManualFuture<AgentInfoStruct>>,
71+
}
72+
73+
impl AgentInfoFetcher {
74+
fn new(agent_infos: AgentInfos, endpoint: Endpoint) -> AgentInfoFetcher {
75+
let (future, completer) = ManualFuture::new();
76+
tokio::spawn(async move {
77+
let mut state: Option<String> = None;
78+
let mut writer = None;
79+
let mut completer = Some(completer);
80+
let mut fetch_endpoint = endpoint.clone();
81+
let mut parts = fetch_endpoint.url.into_parts();
82+
parts.path_and_query = Some(PathAndQuery::from_static("/info"));
83+
fetch_endpoint.url = hyper::Uri::from_parts(parts).unwrap();
84+
loop {
85+
let fetched = fetch_info_with_state(&fetch_endpoint, state.as_deref()).await;
86+
let mut complete_fut = None;
87+
{
88+
let mut infos_guard = agent_infos.0.lock().unwrap();
89+
let infos = infos_guard.get_mut(&endpoint).unwrap();
90+
if infos.rc == 0 && infos.last_update.elapsed().as_secs() > 60 {
91+
break;
92+
}
93+
match fetched {
94+
Ok(FetchInfoStatus::SameState) => {}
95+
Ok(FetchInfoStatus::NewState(status)) => {
96+
state = Some(status.state_hash);
97+
if writer.is_none() {
98+
writer = match OneWayShmWriter::<NamedShmHandle>::new(info_path(
99+
&endpoint,
100+
)) {
101+
Ok(writer) => Some(writer),
102+
Err(e) => {
103+
error!("Failed acquiring an agent info writer: {e:?}");
104+
None
105+
}
106+
};
107+
}
108+
if let Some(ref writer) = writer {
109+
writer.write(&serde_json::to_vec(&status.info).unwrap())
110+
}
111+
if let Some(completer) = completer {
112+
complete_fut = Some(completer.complete(status.info));
113+
} else {
114+
infos.infos = ManualFuture::new_completed(status.info).shared();
115+
}
116+
completer = None;
117+
}
118+
Err(e) => {
119+
warn!(
120+
"The agent info for {} could not be fetched: {}",
121+
fetch_endpoint.url, e
122+
);
123+
}
124+
}
125+
}
126+
if let Some(complete_fut) = complete_fut.take() {
127+
complete_fut.await;
128+
}
129+
sleep(Duration::from_secs(60)).await;
130+
}
131+
agent_infos.0.lock().unwrap().remove(&endpoint);
132+
});
133+
134+
AgentInfoFetcher {
135+
last_update: Instant::now(),
136+
rc: 1,
137+
infos: future.shared(),
138+
}
139+
}
140+
}
141+
142+
fn info_path(endpoint: &Endpoint) -> CString {
143+
let mut hasher = ZwoHasher::default();
144+
endpoint.hash(&mut hasher);
145+
let mut path = format!(
146+
"/ddinf{}-{}",
147+
primary_sidecar_identifier(),
148+
BASE64_URL_SAFE_NO_PAD.encode(hasher.finish().to_ne_bytes()),
149+
);
150+
// datadog agent info, on macos we're restricted to 31 chars
151+
path.truncate(31); // should not be larger than 31 chars, but be sure.
152+
CString::new(path).unwrap()
153+
}
154+
155+
pub struct AgentInfoReader {
156+
reader: OneWayShmReader<NamedShmHandle, CString>,
157+
info: Option<AgentInfoStruct>,
158+
}
159+
160+
impl AgentInfoReader {
161+
pub fn new(endpoint: &Endpoint) -> AgentInfoReader {
162+
let path = info_path(endpoint);
163+
AgentInfoReader {
164+
reader: OneWayShmReader::new(open_named_shm(&path).ok(), path),
165+
info: None,
166+
}
167+
}
168+
169+
pub fn read(&mut self) -> (bool, &Option<AgentInfoStruct>) {
170+
let (updated, data) = self.reader.read();
171+
if updated {
172+
match serde_json::from_slice(data) {
173+
Ok(info) => self.info = Some(info),
174+
Err(e) => error!("Failed deserializing the agent info: {e:?}"),
175+
}
176+
}
177+
(updated, &self.info)
178+
}
179+
}

sidecar/src/service/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use runtime_info::RuntimeInfo;
2727
use session_info::SessionInfo;
2828
use sidecar_interface::{SidecarInterface, SidecarInterfaceRequest, SidecarInterfaceResponse};
2929

30+
pub mod agent_info;
3031
pub mod blocking;
3132
pub mod exception_hash_rate_limiter;
3233
mod instance_id;

sidecar/src/service/session_info.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ use tracing::{debug, error, info, trace};
1818
use crate::log::{MultiEnvFilterGuard, MultiWriterGuard};
1919
use crate::{spawn_map_err, tracer};
2020

21+
use crate::service::agent_info::AgentInfoGuard;
2122
use crate::service::{InstanceId, QueueId, RuntimeInfo};
23+
2224
/// `SessionInfo` holds information about a session.
2325
///
2426
/// It contains a list of runtimes, session configuration, tracer configuration, and log guards.
@@ -31,6 +33,7 @@ pub(crate) struct SessionInfo {
3133
tracer_config: Arc<Mutex<tracer::Config>>,
3234
dogstatsd: Arc<Mutex<Option<dogstatsd_client::Client>>>,
3335
remote_config_invariants: Arc<Mutex<Option<ConfigInvariants>>>,
36+
pub(crate) agent_infos: Arc<Mutex<Option<AgentInfoGuard>>>,
3437
pub(crate) remote_config_interval: Arc<Mutex<Duration>>,
3538
#[cfg(windows)]
3639
pub(crate) remote_config_notify_function:
@@ -50,6 +53,7 @@ impl Clone for SessionInfo {
5053
tracer_config: self.tracer_config.clone(),
5154
dogstatsd: self.dogstatsd.clone(),
5255
remote_config_invariants: self.remote_config_invariants.clone(),
56+
agent_infos: self.agent_infos.clone(),
5357
remote_config_interval: self.remote_config_interval.clone(),
5458
#[cfg(windows)]
5559
remote_config_notify_function: self.remote_config_notify_function.clone(),

sidecar/src/service/sidecar_server.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use serde::{Deserialize, Serialize};
3939
use tokio::task::{JoinError, JoinHandle};
4040

4141
use crate::config::get_product_endpoint;
42+
use crate::service::agent_info::AgentInfos;
4243
use crate::service::exception_hash_rate_limiter::EXCEPTION_HASH_LIMITER;
4344
use crate::service::remote_configs::{RemoteConfigNotifyTarget, RemoteConfigs};
4445
use crate::service::runtime_info::ActiveApplication;
@@ -105,6 +106,8 @@ pub struct SidecarServer {
105106
Arc<Mutex<Option<ManualFutureCompleter<ddtelemetry::config::Config>>>>,
106107
/// Keeps track of the number of submitted payloads.
107108
pub submitted_payloads: Arc<AtomicU64>,
109+
/// All tracked agent infos per endpoint
110+
pub agent_infos: AgentInfos,
108111
/// All remote config handling
109112
remote_configs: RemoteConfigs,
110113
/// The ProcessHandle tied to the connection
@@ -689,6 +692,11 @@ impl SidecarInterface for SidecarServer {
689692
);
690693
cfg.set_endpoint(logs_endpoint, diagnostics_endpoint).ok();
691694
});
695+
if config.endpoint.api_key.is_none() {
696+
// no agent info if agentless
697+
*session.agent_infos.lock().unwrap() =
698+
Some(self.agent_infos.query_for(config.endpoint.clone()));
699+
}
692700
session.set_remote_config_invariants(ConfigInvariants {
693701
language: config.language,
694702
tracer_version: config.tracer_version,

0 commit comments

Comments
 (0)