Skip to content

Commit a8ccdab

Browse files
committed
refactor(agent): convert to a tracing logger
1 parent 663a137 commit a8ccdab

File tree

16 files changed

+369
-648
lines changed

16 files changed

+369
-648
lines changed

Cargo.lock

Lines changed: 94 additions & 230 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,11 @@ solana-account-decoder = "1.18.8"
3333
solana-client = "1.18.8"
3434
solana-sdk = "1.18.8"
3535
bincode = "1.3.3"
36-
slog = { version = "2.7.0", features = ["max_level_trace", "release_max_level_trace"] }
37-
slog-term = "2.9.1"
3836
rand = "0.8.5"
39-
slog-async = "2.8.0"
4037
config = "0.14.0"
4138
thiserror = "1.0.58"
4239
clap = { version = "4.5.4", features = ["derive"] }
4340
humantime-serde = "1.1.1"
44-
slog-envlogger = "2.2.0"
4541
serde-this-or-that = "0.4.2"
4642
# The public typed-html 0.2.2 release is causing a recursion limit
4743
# error that cannot be fixed from outside the crate.
@@ -52,17 +48,17 @@ humantime = "2.1.0"
5248
prometheus-client = "0.22.2"
5349
lazy_static = "1.4.0"
5450
toml_edit = "0.22.9"
55-
slog-bunyan = "2.5.0"
5651
winnow = "0.6.5"
5752
proptest = "1.4.0"
53+
tracing = { version = "0.1.40", features = ["log"] }
54+
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] }
5855

5956
[dev-dependencies]
6057
tokio-util = { version = "0.7.10", features = ["full"] }
6158
soketto = "0.8.0"
6259
portpicker = "0.1.1"
6360
rand = "0.8.5"
6461
tokio-retry = "0.3.0"
65-
slog-extlog = "8.1.0"
6662
iobuffer = "0.2.0"
6763

6864
[profile.release]

integration-tests/tests/test_integration.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -695,7 +695,7 @@ async def test_update_price_discards_unpermissioned(self, client: PythAgentClien
695695
lines_found += 1
696696
expected_unperm_pubkey = final_price_account_unperm["account"]
697697
# Must point at the expected account as all other attempts must be valid
698-
assert f"price_account: {expected_unperm_pubkey}" in line
698+
assert f'"unpermissioned_price_account":"{expected_unperm_pubkey}"' in line
699699

700700
# Must find at least one log discarding the account
701701
assert lines_found > 0

src/agent.rs

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ use {
7171
anyhow::Result,
7272
futures_util::future::join_all,
7373
lazy_static::lazy_static,
74-
slog::Logger,
7574
std::sync::Arc,
7675
tokio::sync::watch,
7776
};
@@ -105,31 +104,33 @@ impl Agent {
105104
Agent { config }
106105
}
107106

108-
pub async fn start(&self, logger: Logger) {
109-
info!(logger, "Starting {}", env!("CARGO_PKG_NAME");
110-
"config" => format!("{:?}", &self.config),
111-
"version" => env!("CARGO_PKG_VERSION"),
112-
"cwd" => std::env::current_dir().map(|p| format!("{}", p.display())).unwrap_or("<could not get current directory>".to_owned())
107+
pub async fn start(&self) {
108+
tracing::info!(
109+
config = format!("{:?}", &self.config),
110+
version = env!("CARGO_PKG_VERSION"),
111+
cwd = std::env::current_dir()
112+
.map(|p| format!("{}", p.display()))
113+
.unwrap_or("<could not get current directory>".to_owned()),
114+
"Starting {}",
115+
env!("CARGO_PKG_NAME"),
113116
);
114117

115-
if let Err(err) = self.spawn(logger.clone()).await {
116-
error!(logger, "{}", err);
117-
debug!(logger, "error context"; "context" => format!("{:?}", err));
118+
if let Err(err) = self.spawn().await {
119+
tracing::error!(err = ?err, "Agent spawn failed.");
118120
};
119121
}
120122

121-
async fn spawn(&self, logger: Logger) -> Result<()> {
123+
async fn spawn(&self) -> Result<()> {
122124
// job handles
123125
let mut jhs = vec![];
124126

125127
// Create the Application State.
126-
let state = Arc::new(state::State::new(self.config.state.clone(), logger.clone()).await);
128+
let state = Arc::new(state::State::new(self.config.state.clone()).await);
127129

128130
// Spawn the primary network
129131
jhs.extend(network::spawn_network(
130132
self.config.primary_network.clone(),
131133
network::Network::Primary,
132-
logger.new(o!("primary" => true)),
133134
state.clone(),
134135
)?);
135136

@@ -138,26 +139,22 @@ impl Agent {
138139
jhs.extend(network::spawn_network(
139140
config.clone(),
140141
network::Network::Secondary,
141-
logger.new(o!("primary" => false)),
142142
state.clone(),
143143
)?);
144144
}
145145

146146
// Create the Notifier task for the Pythd RPC.
147-
jhs.push(tokio::spawn(notifier(logger.clone(), state.clone())));
147+
jhs.push(tokio::spawn(notifier(state.clone())));
148148

149149
// Spawn the Pythd API Server
150150
jhs.push(tokio::spawn(rpc::run(
151151
self.config.pythd_api_server.clone(),
152-
logger.clone(),
153152
state.clone(),
154153
)));
155154

156155
// Spawn the metrics server
157-
jhs.push(tokio::spawn(metrics::MetricsServer::spawn(
156+
jhs.push(tokio::spawn(metrics::spawn(
158157
self.config.metrics_server.bind_address,
159-
logger.clone(),
160-
state.clone(),
161158
)));
162159

163160
// Spawn the remote keypair loader endpoint for both networks
@@ -169,7 +166,6 @@ impl Agent {
169166
.as_ref()
170167
.map(|c| c.rpc_url.clone()),
171168
self.config.remote_keypair_loader.clone(),
172-
logger,
173169
state,
174170
)
175171
.await,

src/agent/metrics.rs

Lines changed: 26 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
use {
2-
super::state::{
3-
local::PriceInfo,
4-
State,
5-
},
2+
super::state::local::PriceInfo,
63
crate::agent::solana::oracle::PriceEntry,
74
lazy_static::lazy_static,
85
prometheus_client::{
@@ -18,15 +15,13 @@ use {
1815
registry::Registry,
1916
},
2017
serde::Deserialize,
21-
slog::Logger,
2218
solana_sdk::pubkey::Pubkey,
2319
std::{
2420
net::SocketAddr,
2521
sync::{
2622
atomic::AtomicU64,
2723
Arc,
2824
},
29-
time::Instant,
3025
},
3126
tokio::sync::Mutex,
3227
warp::{
@@ -61,53 +56,33 @@ lazy_static! {
6156
Arc::new(Mutex::new(<Registry>::default()));
6257
}
6358

64-
/// Internal metrics server state, holds state needed for serving
65-
/// metrics.
66-
pub struct MetricsServer {
67-
pub start_time: Instant,
68-
pub logger: Logger,
69-
pub state: Arc<State>,
70-
}
71-
72-
impl MetricsServer {
73-
/// Instantiate a metrics API.
74-
pub async fn spawn(addr: impl Into<SocketAddr> + 'static, logger: Logger, state: Arc<State>) {
75-
let server = MetricsServer {
76-
start_time: Instant::now(),
77-
logger,
78-
state,
79-
};
80-
81-
let shared_state = Arc::new(Mutex::new(server));
82-
let shared_state4metrics = shared_state.clone();
83-
let metrics_route = warp::path("metrics")
84-
.and(warp::path::end())
85-
.and_then(move || {
86-
let shared_state = shared_state4metrics.clone();
87-
async move {
88-
let locked_state = shared_state.lock().await;
89-
let mut buf = String::new();
90-
let response = encode(&mut buf, &&PROMETHEUS_REGISTRY.lock().await)
91-
.map_err(|e| -> Box<dyn std::error::Error> {
92-
e.into()
93-
})
94-
.and_then(|_| -> Result<_, Box<dyn std::error::Error>> {
95-
Ok(Box::new(reply::with_status(buf, StatusCode::OK)))
96-
}).unwrap_or_else(|e| {
97-
error!(locked_state.logger, "Metrics: Could not gather metrics from registry"; "error" => e.to_string());
98-
Box::new(reply::with_status("Could not gather metrics. See logs for details".to_string(), StatusCode::INTERNAL_SERVER_ERROR))
99-
});
100-
101-
Result::<Box<dyn Reply>, Rejection>::Ok(response)
102-
}
103-
});
104-
105-
let (_, serve) = warp::serve(metrics_route).bind_with_graceful_shutdown(addr, async {
106-
let _ = crate::agent::EXIT.subscribe().changed().await;
59+
/// Instantiate a metrics API.
60+
pub async fn spawn(addr: impl Into<SocketAddr> + 'static) {
61+
let metrics_route = warp::path("metrics")
62+
.and(warp::path::end())
63+
.and_then(move || async move {
64+
let mut buf = String::new();
65+
let response = encode(&mut buf, &&PROMETHEUS_REGISTRY.lock().await)
66+
.map_err(|e| -> Box<dyn std::error::Error> { e.into() })
67+
.and_then(|_| -> Result<_, Box<dyn std::error::Error>> {
68+
Ok(Box::new(reply::with_status(buf, StatusCode::OK)))
69+
})
70+
.unwrap_or_else(|e| {
71+
tracing::error!(err = ?e, "Metrics: Could not gather metrics from registry");
72+
Box::new(reply::with_status(
73+
"Could not gather metrics. See logs for details".to_string(),
74+
StatusCode::INTERNAL_SERVER_ERROR,
75+
))
76+
});
77+
78+
Result::<Box<dyn Reply>, Rejection>::Ok(response)
10779
});
10880

109-
serve.await
110-
}
81+
let (_, serve) = warp::serve(metrics_route).bind_with_graceful_shutdown(addr, async {
82+
let _ = crate::agent::EXIT.subscribe().changed().await;
83+
});
84+
85+
serve.await
11186
}
11287

11388
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]

0 commit comments

Comments
 (0)