11// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ 
22// SPDX-License-Identifier: Apache-2.0 
3+ 
4+ //! This file contains code for fetching and sharing the info from the Datadog Agent. 
5+ //! It will keep one fetcher per Endpoint. The SidecarServer is expected to keep the AgentInfoGuard 
6+ //! alive for the lifetime of the session. 
7+ //! The fetcher will remain alive for a short while after all guards have been dropped. 
8+ //! It writes the raw agent response to shared memory at a fixed per-endpoint location, to be 
9+ //! consumed be tracers. 
10+ 
311use  crate :: one_way_shared_memory:: { open_named_shm,  OneWayShmReader ,  OneWayShmWriter } ; 
412use  crate :: primary_sidecar_identifier; 
513use  base64:: prelude:: BASE64_URL_SAFE_NO_PAD ; 
@@ -24,6 +32,8 @@ use zwohash::{HashMap, ZwoHasher};
2432pub  struct  AgentInfos ( Arc < Mutex < HashMap < Endpoint ,  AgentInfoFetcher > > > ) ; 
2533
2634impl  AgentInfos  { 
35+     /// Ensures a fetcher for the endpoints agent info and keeps it alive for at least as long as 
36+      /// the returned guard exists. 
2737     pub  fn  query_for ( & self ,  endpoint :  Endpoint )  -> AgentInfoGuard  { 
2838        let  mut  infos_guard = self . 0 . lock ( ) . unwrap ( ) ; 
2939        if  let  Some ( info)  = infos_guard. get_mut ( & endpoint)  { 
@@ -65,8 +75,12 @@ impl Drop for AgentInfoGuard {
6575} 
6676
6777pub  struct  AgentInfoFetcher  { 
78+     /// Once the last_update is too old, we'll stop the fetcher. 
6879     last_update :  Instant , 
80+     /// Will be kept alive forever if rc > 0. 
6981     rc :  u32 , 
82+     /// The initial fetch is an unresolved future (to be able to await on it), subsequent fetches 
83+      /// are simply directly replacing this with a resolved future. 
7084     infos :  Shared < ManualFuture < AgentInfoStruct > > , 
7185} 
7286
@@ -116,6 +130,8 @@ impl AgentInfoFetcher {
116130                            completer = None ; 
117131                        } 
118132                        Err ( e)  => { 
133+                             // We'll just return the old values as long as the endpoint is 
134+                             // unreachable. 
119135                            warn ! ( 
120136                                "The agent info for {} could not be fetched: {}" , 
121137                                fetch_endpoint. url,  e
@@ -177,3 +193,51 @@ impl AgentInfoReader {
177193        ( updated,  & self . info ) 
178194    } 
179195} 
196+ 
197+ #[ cfg( test) ]  
198+ mod  tests { 
199+     use  super :: * ; 
200+     use  httpmock:: prelude:: * ; 
201+ 
202+     const  TEST_INFO :  & str  = r#"{ 
203+         "config": { 
204+             "default_env": "testenv" 
205+         } 
206+         }"# ; 
207+ 
208+     const  TEST_INFO_HASH :  & str  = "8c732aba385d605b010cd5bd12c03fef402eaefce989f0055aa4c7e92fe30077" ; 
209+ 
210+     #[ cfg_attr( miri,  ignore) ]  
211+     #[ tokio:: test]  
212+     async  fn  test_fetch_info_without_state ( )  { 
213+         let  server = MockServer :: start ( ) ; 
214+         let  mock = server
215+             . mock_async ( |when,  then| { 
216+                 when. path ( "/info" ) ; 
217+                 then. status ( 200 ) 
218+                     . header ( "content-type" ,  "application/json" ) 
219+                     . header ( "datadog-agent-state" ,  TEST_INFO_HASH ) 
220+                     . body ( TEST_INFO ) ; 
221+             } ) 
222+             . await ; 
223+         let  endpoint = Endpoint :: from_url ( server. url ( "/" ) . parse ( ) . unwrap ( ) ) ; 
224+         let  agent_infos = AgentInfos :: default ( ) ; 
225+ 
226+         let  mut  reader = AgentInfoReader :: new ( & endpoint) ; 
227+         assert_eq ! ( reader. read( ) ,  ( false ,  & None ) ) ; 
228+ 
229+         let  info = agent_infos. query_for ( endpoint) . get ( ) . await ; 
230+         mock. assert ( ) ; 
231+         assert_eq ! ( 
232+             info. config. unwrap( ) . default_env, 
233+             Some ( "testenv" . to_string( ) ) 
234+         ) ; 
235+ 
236+         let  ( updated,  info)  = reader. read ( ) ; 
237+         assert_eq ! ( updated,  true ) ; 
238+         assert_eq ! ( 
239+             info. as_ref( ) . unwrap( ) . config. as_ref( ) . unwrap( ) . default_env, 
240+             Some ( "testenv" . to_string( ) ) 
241+         ) ; 
242+     } 
243+ } 
0 commit comments