Skip to content

Commit 7039e0f

Browse files
authored
Collect agent info from sidecar (#701)
* Collect agent info from sidecar Signed-off-by: Bob Weinand <bob.weinand@datadoghq.com> * Add comments and test for AgentInfo in sidecar Signed-off-by: Bob Weinand <bob.weinand@datadoghq.com> --------- Signed-off-by: Bob Weinand <bob.weinand@datadoghq.com>
1 parent aed5ac8 commit 7039e0f

File tree

10 files changed

+300
-6
lines changed

10 files changed

+300
-6
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

data-pipeline/src/agent_info/fetcher.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@ use tokio::time::sleep;
1717
#[allow(clippy::declare_interior_mutable_const)]
1818
const DATADOG_AGENT_STATE: HeaderName = HeaderName::from_static("datadog-agent-state");
1919

20+
/// Whether the agent reported the same value or not.
2021
#[derive(Debug)]
21-
enum FetchInfoStatus {
22+
pub enum FetchInfoStatus {
23+
/// Unchanged
2224
SameState,
25+
/// Has a new state
2326
NewState(Box<AgentInfo>),
2427
}
2528

@@ -28,7 +31,7 @@ enum FetchInfoStatus {
2831
/// If the state hash is different from the current one:
2932
/// - Return a `FetchInfoStatus::NewState` of the info struct
3033
/// - Else return `FetchInfoStatus::SameState`
31-
async fn fetch_info_with_state(
34+
pub async fn fetch_info_with_state(
3235
info_endpoint: &Endpoint,
3336
current_state_hash: Option<&str>,
3437
) -> Result<FetchInfoStatus> {

data-pipeline/src/agent_info/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,4 @@ mod fetcher;
1414
/// Stores an AgentInfo in an ArcSwap to be updated by an AgentInfoFetcher
1515
pub type AgentInfoArc = Arc<ArcSwapOption<schema::AgentInfo>>;
1616

17-
pub use fetcher::{fetch_info, AgentInfoFetcher};
17+
pub use fetcher::{fetch_info, fetch_info_with_state, AgentInfoFetcher, FetchInfoStatus};

data-pipeline/src/agent_info/schema.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
22
// SPDX-License-Identifier: Apache-2.0
33
//! This module provides struct representing the info endpoint response
4-
use serde::Deserialize;
4+
use serde::{Deserialize, Serialize};
55
use std::collections::HashMap;
66

77
/// Wrapper for an agent info response storing the state hash from the agent
@@ -15,7 +15,7 @@ pub struct AgentInfo {
1515

1616
/// Schema of an agent info response
1717
#[allow(missing_docs)]
18-
#[derive(Clone, Deserialize, Default, Debug, PartialEq)]
18+
#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq)]
1919
pub struct AgentInfoStruct {
2020
/// Version of the agent
2121
pub version: Option<String>,
@@ -38,7 +38,7 @@ pub struct AgentInfoStruct {
3838
}
3939

4040
#[allow(missing_docs)]
41-
#[derive(Clone, Deserialize, Default, Debug, PartialEq)]
41+
#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq)]
4242
pub struct Config {
4343
pub default_env: Option<String>,
4444
pub target_tps: Option<f64>,

sidecar-ffi/src/lib.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use datadog_sidecar::config;
1414
use datadog_sidecar::config::LogMethod;
1515
use datadog_sidecar::crashtracker::crashtracker_unix_socket_path;
1616
use datadog_sidecar::one_way_shared_memory::{OneWayShmReader, ReaderOpener};
17+
use datadog_sidecar::service::agent_info::AgentInfoReader;
1718
use datadog_sidecar::service::{
1819
blocking::{self, SidecarTransport},
1920
InstanceId, QueueId, RuntimeMetadata, SerializedTracerHeaderTags, SessionConfig, SidecarAction,
@@ -911,3 +912,35 @@ pub unsafe extern "C" fn ddog_sidecar_get_crashtracker_unix_socket_path() -> ffi
911912
buf.copy_from_slice(str.as_bytes());
912913
ffi::CharSlice::from_raw_parts(malloced as *mut c_char, size)
913914
}
915+
916+
/// Gets an agent info reader.
917+
#[no_mangle]
918+
#[allow(clippy::missing_safety_doc)]
919+
pub unsafe extern "C" fn ddog_get_agent_info_reader(endpoint: &Endpoint) -> Box<AgentInfoReader> {
920+
Box::new(AgentInfoReader::new(endpoint))
921+
}
922+
923+
/// Gets the current agent info environment (or empty if not existing)
924+
#[no_mangle]
925+
#[allow(clippy::missing_safety_doc)]
926+
pub unsafe extern "C" fn ddog_get_agent_info_env<'a>(
927+
reader: &'a mut AgentInfoReader,
928+
changed: &mut bool,
929+
) -> ffi::CharSlice<'a> {
930+
let (has_changed, info) = reader.read();
931+
*changed = has_changed;
932+
let config = if let Some(info) = info {
933+
info.config.as_ref()
934+
} else {
935+
None
936+
};
937+
config
938+
.and_then(|c| c.default_env.as_ref())
939+
.map(|s| ffi::CharSlice::from(s.as_str()))
940+
.unwrap_or(ffi::CharSlice::empty())
941+
}
942+
943+
/// Drops the agent info reader.
944+
#[no_mangle]
945+
#[allow(clippy::missing_safety_doc)]
946+
pub unsafe extern "C" fn ddog_drop_agent_info_reader(_: Box<AgentInfoReader>) {}

sidecar/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ ddcommon = { path = "../ddcommon" }
2121
datadog-sidecar-macros = { path = "macros" }
2222

2323
ddtelemetry = { path = "../ddtelemetry", features = ["tracing"] }
24+
data-pipeline = { path = "../data-pipeline" }
2425
datadog-trace-protobuf = { path = "../trace-protobuf" }
2526
datadog-trace-utils = { path = "../trace-utils" }
2627
datadog-trace-normalization = { path = "../trace-normalization" }

sidecar/src/service/agent_info.rs

Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//! This file contains code for fetching and sharing the info from the Datadog Agent.
5+
//! It will keep one fetcher per Endpoint. The SidecarServer is expected to keep the AgentInfoGuard
6+
//! alive for the lifetime of the session.
7+
//! The fetcher will remain alive for a short while after all guards have been dropped.
8+
//! It writes the raw agent response to shared memory at a fixed per-endpoint location, to be
9+
//! consumed be tracers.
10+
11+
use crate::one_way_shared_memory::{open_named_shm, OneWayShmReader, OneWayShmWriter};
12+
use crate::primary_sidecar_identifier;
13+
use base64::prelude::BASE64_URL_SAFE_NO_PAD;
14+
use base64::Engine;
15+
use data_pipeline::agent_info::schema::AgentInfoStruct;
16+
use data_pipeline::agent_info::{fetch_info_with_state, FetchInfoStatus};
17+
use datadog_ipc::platform::NamedShmHandle;
18+
use ddcommon::Endpoint;
19+
use futures::future::Shared;
20+
use futures::FutureExt;
21+
use http::uri::PathAndQuery;
22+
use manual_future::ManualFuture;
23+
use std::ffi::CString;
24+
use std::hash::{Hash, Hasher};
25+
use std::sync::{Arc, Mutex};
26+
use std::time::{Duration, Instant};
27+
use tokio::time::sleep;
28+
use tracing::{error, warn};
29+
use zwohash::{HashMap, ZwoHasher};
30+
31+
#[derive(Default, Clone)]
32+
pub struct AgentInfos(Arc<Mutex<HashMap<Endpoint, AgentInfoFetcher>>>);
33+
34+
impl AgentInfos {
35+
/// Ensures a fetcher for the endpoints agent info and keeps it alive for at least as long as
36+
/// the returned guard exists.
37+
pub fn query_for(&self, endpoint: Endpoint) -> AgentInfoGuard {
38+
let mut infos_guard = self.0.lock().unwrap();
39+
if let Some(info) = infos_guard.get_mut(&endpoint) {
40+
info.rc += 1;
41+
} else {
42+
infos_guard.insert(
43+
endpoint.clone(),
44+
AgentInfoFetcher::new(self.clone(), endpoint.clone()),
45+
);
46+
}
47+
48+
AgentInfoGuard {
49+
infos: self.clone(),
50+
endpoint,
51+
}
52+
}
53+
}
54+
55+
pub struct AgentInfoGuard {
56+
infos: AgentInfos,
57+
endpoint: Endpoint,
58+
}
59+
60+
impl AgentInfoGuard {
61+
pub fn get(&self) -> Shared<ManualFuture<AgentInfoStruct>> {
62+
let infos_guard = self.infos.0.lock().unwrap();
63+
let infos = infos_guard.get(&self.endpoint).unwrap();
64+
infos.infos.clone()
65+
}
66+
}
67+
68+
impl Drop for AgentInfoGuard {
69+
fn drop(&mut self) {
70+
let mut infos_guard = self.infos.0.lock().unwrap();
71+
let info = infos_guard.get_mut(&self.endpoint).unwrap();
72+
info.last_update = Instant::now();
73+
info.rc -= 1;
74+
}
75+
}
76+
77+
pub struct AgentInfoFetcher {
78+
/// Once the last_update is too old, we'll stop the fetcher.
79+
last_update: Instant,
80+
/// Will be kept alive forever if rc > 0.
81+
rc: u32,
82+
/// The initial fetch is an unresolved future (to be able to await on it), subsequent fetches
83+
/// are simply directly replacing this with a resolved future.
84+
infos: Shared<ManualFuture<AgentInfoStruct>>,
85+
}
86+
87+
impl AgentInfoFetcher {
88+
fn new(agent_infos: AgentInfos, endpoint: Endpoint) -> AgentInfoFetcher {
89+
let (future, completer) = ManualFuture::new();
90+
tokio::spawn(async move {
91+
let mut state: Option<String> = None;
92+
let mut writer = None;
93+
let mut completer = Some(completer);
94+
let mut fetch_endpoint = endpoint.clone();
95+
let mut parts = fetch_endpoint.url.into_parts();
96+
parts.path_and_query = Some(PathAndQuery::from_static("/info"));
97+
fetch_endpoint.url = hyper::Uri::from_parts(parts).unwrap();
98+
loop {
99+
let fetched = fetch_info_with_state(&fetch_endpoint, state.as_deref()).await;
100+
let mut complete_fut = None;
101+
{
102+
let mut infos_guard = agent_infos.0.lock().unwrap();
103+
let infos = infos_guard.get_mut(&endpoint).unwrap();
104+
if infos.rc == 0 && infos.last_update.elapsed().as_secs() > 60 {
105+
break;
106+
}
107+
match fetched {
108+
Ok(FetchInfoStatus::SameState) => {}
109+
Ok(FetchInfoStatus::NewState(status)) => {
110+
state = Some(status.state_hash);
111+
if writer.is_none() {
112+
writer = match OneWayShmWriter::<NamedShmHandle>::new(info_path(
113+
&endpoint,
114+
)) {
115+
Ok(writer) => Some(writer),
116+
Err(e) => {
117+
error!("Failed acquiring an agent info writer: {e:?}");
118+
None
119+
}
120+
};
121+
}
122+
if let Some(ref writer) = writer {
123+
writer.write(&serde_json::to_vec(&status.info).unwrap())
124+
}
125+
if let Some(completer) = completer {
126+
complete_fut = Some(completer.complete(status.info));
127+
} else {
128+
infos.infos = ManualFuture::new_completed(status.info).shared();
129+
}
130+
completer = None;
131+
}
132+
Err(e) => {
133+
// We'll just return the old values as long as the endpoint is
134+
// unreachable.
135+
warn!(
136+
"The agent info for {} could not be fetched: {}",
137+
fetch_endpoint.url, e
138+
);
139+
}
140+
}
141+
}
142+
if let Some(complete_fut) = complete_fut.take() {
143+
complete_fut.await;
144+
}
145+
sleep(Duration::from_secs(60)).await;
146+
}
147+
agent_infos.0.lock().unwrap().remove(&endpoint);
148+
});
149+
150+
AgentInfoFetcher {
151+
last_update: Instant::now(),
152+
rc: 1,
153+
infos: future.shared(),
154+
}
155+
}
156+
}
157+
158+
fn info_path(endpoint: &Endpoint) -> CString {
159+
let mut hasher = ZwoHasher::default();
160+
endpoint.hash(&mut hasher);
161+
let mut path = format!(
162+
"/ddinf{}-{}",
163+
primary_sidecar_identifier(),
164+
BASE64_URL_SAFE_NO_PAD.encode(hasher.finish().to_ne_bytes()),
165+
);
166+
// datadog agent info, on macos we're restricted to 31 chars
167+
path.truncate(31); // should not be larger than 31 chars, but be sure.
168+
CString::new(path).unwrap()
169+
}
170+
171+
pub struct AgentInfoReader {
172+
reader: OneWayShmReader<NamedShmHandle, CString>,
173+
info: Option<AgentInfoStruct>,
174+
}
175+
176+
impl AgentInfoReader {
177+
pub fn new(endpoint: &Endpoint) -> AgentInfoReader {
178+
let path = info_path(endpoint);
179+
AgentInfoReader {
180+
reader: OneWayShmReader::new(open_named_shm(&path).ok(), path),
181+
info: None,
182+
}
183+
}
184+
185+
pub fn read(&mut self) -> (bool, &Option<AgentInfoStruct>) {
186+
let (updated, data) = self.reader.read();
187+
if updated {
188+
match serde_json::from_slice(data) {
189+
Ok(info) => self.info = Some(info),
190+
Err(e) => error!("Failed deserializing the agent info: {e:?}"),
191+
}
192+
}
193+
(updated, &self.info)
194+
}
195+
}
196+
197+
#[cfg(test)]
198+
mod tests {
199+
use super::*;
200+
use httpmock::prelude::*;
201+
202+
const TEST_INFO: &str = r#"{
203+
"config": {
204+
"default_env": "testenv"
205+
}
206+
}"#;
207+
208+
const TEST_INFO_HASH: &str = "8c732aba385d605b010cd5bd12c03fef402eaefce989f0055aa4c7e92fe30077";
209+
210+
#[cfg_attr(miri, ignore)]
211+
#[tokio::test]
212+
async fn test_fetch_info_without_state() {
213+
let server = MockServer::start();
214+
let mock = server
215+
.mock_async(|when, then| {
216+
when.path("/info");
217+
then.status(200)
218+
.header("content-type", "application/json")
219+
.header("datadog-agent-state", TEST_INFO_HASH)
220+
.body(TEST_INFO);
221+
})
222+
.await;
223+
let endpoint = Endpoint::from_url(server.url("/").parse().unwrap());
224+
let agent_infos = AgentInfos::default();
225+
226+
let mut reader = AgentInfoReader::new(&endpoint);
227+
assert_eq!(reader.read(), (false, &None));
228+
229+
let info = agent_infos.query_for(endpoint).get().await;
230+
mock.assert();
231+
assert_eq!(
232+
info.config.unwrap().default_env,
233+
Some("testenv".to_string())
234+
);
235+
236+
let (updated, info) = reader.read();
237+
assert!(updated);
238+
assert_eq!(
239+
info.as_ref().unwrap().config.as_ref().unwrap().default_env,
240+
Some("testenv".to_string())
241+
);
242+
}
243+
}

sidecar/src/service/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use runtime_info::RuntimeInfo;
2727
use session_info::SessionInfo;
2828
use sidecar_interface::{SidecarInterface, SidecarInterfaceRequest, SidecarInterfaceResponse};
2929

30+
pub mod agent_info;
3031
pub mod blocking;
3132
pub mod exception_hash_rate_limiter;
3233
mod instance_id;

sidecar/src/service/session_info.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ use tracing::{debug, error, info, trace};
1818
use crate::log::{MultiEnvFilterGuard, MultiWriterGuard};
1919
use crate::{spawn_map_err, tracer};
2020

21+
use crate::service::agent_info::AgentInfoGuard;
2122
use crate::service::{InstanceId, QueueId, RuntimeInfo};
23+
2224
/// `SessionInfo` holds information about a session.
2325
///
2426
/// It contains a list of runtimes, session configuration, tracer configuration, and log guards.
@@ -31,6 +33,7 @@ pub(crate) struct SessionInfo {
3133
tracer_config: Arc<Mutex<tracer::Config>>,
3234
dogstatsd: Arc<Mutex<Option<dogstatsd_client::Client>>>,
3335
remote_config_invariants: Arc<Mutex<Option<ConfigInvariants>>>,
36+
pub(crate) agent_infos: Arc<Mutex<Option<AgentInfoGuard>>>,
3437
pub(crate) remote_config_interval: Arc<Mutex<Duration>>,
3538
#[cfg(windows)]
3639
pub(crate) remote_config_notify_function:
@@ -50,6 +53,7 @@ impl Clone for SessionInfo {
5053
tracer_config: self.tracer_config.clone(),
5154
dogstatsd: self.dogstatsd.clone(),
5255
remote_config_invariants: self.remote_config_invariants.clone(),
56+
agent_infos: self.agent_infos.clone(),
5357
remote_config_interval: self.remote_config_interval.clone(),
5458
#[cfg(windows)]
5559
remote_config_notify_function: self.remote_config_notify_function.clone(),

0 commit comments

Comments
 (0)