Skip to content

Commit 9db2592

Browse files
committed
refactor(agent): move keypairs into services
1 parent 669cb91 commit 9db2592

File tree

4 files changed

+241
-229
lines changed

4 files changed

+241
-229
lines changed

src/agent.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ impl Agent {
159159

160160
// Spawn the remote keypair loader endpoint for both networks
161161
jhs.append(
162-
&mut state::keypairs::spawn(
162+
&mut services::keypairs(
163163
self.config.primary_network.rpc_url.clone(),
164164
self.config
165165
.secondary_network
@@ -183,6 +183,7 @@ pub mod config {
183183
super::{
184184
metrics,
185185
pyth,
186+
services,
186187
solana::network,
187188
state,
188189
},
@@ -211,7 +212,7 @@ pub mod config {
211212
#[serde(default)]
212213
pub metrics_server: metrics::Config,
213214
#[serde(default)]
214-
pub remote_keypair_loader: state::keypairs::Config,
215+
pub remote_keypair_loader: services::keypairs::Config,
215216
}
216217

217218
impl Config {

src/agent/services.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1-
mod notifier;
1+
pub mod keypairs;
2+
pub mod notifier;
23

3-
pub use notifier::notifier;
4+
pub use {
5+
keypairs::keypairs,
6+
notifier::notifier,
7+
};

src/agent/services/keypairs.rs

Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
//! Keypairs
2+
//!
3+
//! The Keypairs Service allows hotloading keys for the running agent.
4+
5+
use {
6+
crate::agent::{
7+
solana::network::Network,
8+
state::keypairs::Keypairs,
9+
},
10+
anyhow::{
11+
Context,
12+
Result,
13+
},
14+
serde::Deserialize,
15+
solana_client::nonblocking::rpc_client::RpcClient,
16+
solana_sdk::{
17+
commitment_config::CommitmentConfig,
18+
signature::Keypair,
19+
signer::Signer,
20+
},
21+
std::{
22+
net::SocketAddr,
23+
sync::Arc,
24+
},
25+
tokio::task::JoinHandle,
26+
warp::{
27+
hyper::StatusCode,
28+
reject::Rejection,
29+
reply::{
30+
self,
31+
WithStatus,
32+
},
33+
Filter,
34+
},
35+
};
36+
37+
const DEFAULT_MIN_KEYPAIR_BALANCE_SOL: u64 = 1;
38+
39+
pub fn default_bind_address() -> SocketAddr {
40+
"127.0.0.1:9001"
41+
.parse()
42+
.expect("INTERNAL: Could not build default remote keypair loader bind address")
43+
}
44+
45+
#[derive(Clone, Debug, Deserialize)]
46+
#[serde(default)]
47+
pub struct Config {
48+
primary_min_keypair_balance_sol: u64,
49+
secondary_min_keypair_balance_sol: u64,
50+
bind_address: SocketAddr,
51+
}
52+
53+
impl Default for Config {
54+
fn default() -> Self {
55+
Self {
56+
primary_min_keypair_balance_sol: DEFAULT_MIN_KEYPAIR_BALANCE_SOL,
57+
secondary_min_keypair_balance_sol: DEFAULT_MIN_KEYPAIR_BALANCE_SOL,
58+
bind_address: default_bind_address(),
59+
}
60+
}
61+
}
62+
63+
pub async fn keypairs<S>(
64+
primary_rpc_url: String,
65+
secondary_rpc_url: Option<String>,
66+
config: Config,
67+
state: Arc<S>,
68+
) -> Vec<JoinHandle<()>>
69+
where
70+
S: Keypairs,
71+
S: Send + Sync + 'static,
72+
{
73+
let ip = config.bind_address.ip();
74+
75+
if !ip.is_loopback() {
76+
tracing::warn!(
77+
bind_address = ?config.bind_address,
78+
"Remote key loader: bind address is not localhost. Make sure the access on the selected address is secure.",
79+
);
80+
}
81+
82+
let primary_upload_route = {
83+
let state = state.clone();
84+
let rpc_url = primary_rpc_url.clone();
85+
let min_balance = config.primary_min_keypair_balance_sol;
86+
warp::path!("primary" / "load_keypair")
87+
.and(warp::post())
88+
.and(warp::body::content_length_limit(1024))
89+
.and(warp::body::json())
90+
.and(warp::path::end())
91+
.and_then(move |kp: Vec<u8>| {
92+
let state = state.clone();
93+
let rpc_url = rpc_url.clone();
94+
async move {
95+
let response = handle_new_keypair(
96+
state,
97+
Network::Primary,
98+
kp,
99+
min_balance,
100+
rpc_url,
101+
"primary",
102+
)
103+
.await;
104+
Result::<WithStatus<_>, Rejection>::Ok(response)
105+
}
106+
})
107+
};
108+
109+
let secondary_upload_route = warp::path!("secondary" / "load_keypair")
110+
.and(warp::post())
111+
.and(warp::body::content_length_limit(1024))
112+
.and(warp::body::json())
113+
.and(warp::path::end())
114+
.and_then(move |kp: Vec<u8>| {
115+
let state = state.clone();
116+
let rpc_url = secondary_rpc_url.clone();
117+
async move {
118+
if let Some(rpc_url) = rpc_url {
119+
let min_balance = config.secondary_min_keypair_balance_sol;
120+
let response = handle_new_keypair(
121+
state,
122+
Network::Secondary,
123+
kp,
124+
min_balance,
125+
rpc_url,
126+
"secondary",
127+
)
128+
.await;
129+
Result::<WithStatus<_>, Rejection>::Ok(response)
130+
} else {
131+
Result::<WithStatus<_>, Rejection>::Ok(reply::with_status(
132+
"Secondary network is not active",
133+
StatusCode::SERVICE_UNAVAILABLE,
134+
))
135+
}
136+
}
137+
});
138+
139+
let http_api_jh = {
140+
let (_, serve) = warp::serve(primary_upload_route.or(secondary_upload_route))
141+
.bind_with_graceful_shutdown(config.bind_address, async {
142+
let _ = crate::agent::EXIT.subscribe().changed().await;
143+
});
144+
tokio::spawn(serve)
145+
};
146+
147+
// WARNING: All jobs spawned here must report their join handles in this vec
148+
vec![http_api_jh]
149+
}
150+
151+
/// Validate and apply a keypair to the specified mut reference,
152+
/// hiding errors in logs.
153+
///
154+
/// Returns the appropriate HTTP response depending on checks success.
155+
///
156+
/// NOTE(2023-03-22): Lifetime bounds are currently necessary
157+
/// because of https://github.com/rust-lang/rust/issues/63033
158+
async fn handle_new_keypair<'a, 'b: 'a, S>(
159+
state: Arc<S>,
160+
network: Network,
161+
new_keypair_bytes: Vec<u8>,
162+
min_keypair_balance_sol: u64,
163+
rpc_url: String,
164+
network_name: &'b str,
165+
) -> WithStatus<&'static str>
166+
where
167+
S: Keypairs,
168+
{
169+
let mut upload_ok = true;
170+
match Keypair::from_bytes(&new_keypair_bytes) {
171+
Ok(kp) => match validate_keypair(&kp, min_keypair_balance_sol, rpc_url.clone()).await {
172+
Ok(()) => {
173+
Keypairs::update_keypair(&*state, network, kp).await;
174+
}
175+
Err(e) => {
176+
tracing::warn!(
177+
network = network_name,
178+
error = e.to_string(),
179+
"Remote keypair loader: Keypair failed validation",
180+
);
181+
upload_ok = false;
182+
}
183+
},
184+
Err(e) => {
185+
tracing::warn!(
186+
network = network_name,
187+
error = e.to_string(),
188+
"Remote keypair loader: Keypair failed validation",
189+
);
190+
upload_ok = false;
191+
}
192+
}
193+
194+
if upload_ok {
195+
reply::with_status("keypair upload OK", StatusCode::OK)
196+
} else {
197+
reply::with_status(
198+
"Could not upload keypair. See logs for details.",
199+
StatusCode::BAD_REQUEST,
200+
)
201+
}
202+
}
203+
204+
/// Validate keypair balance before using it in transactions.
205+
pub async fn validate_keypair(
206+
kp: &Keypair,
207+
min_keypair_balance_sol: u64,
208+
rpc_url: String,
209+
) -> Result<()> {
210+
let c = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::confirmed());
211+
212+
let balance_lamports = c
213+
.get_balance(&kp.pubkey())
214+
.await
215+
.context("Could not check keypair's balance")?;
216+
217+
let lamports_in_sol = 1_000_000_000;
218+
219+
if balance_lamports > min_keypair_balance_sol * lamports_in_sol {
220+
Ok(())
221+
} else {
222+
Err(anyhow::anyhow!(format!(
223+
"Keypair {} balance of {} SOL below threshold of {} SOL",
224+
kp.pubkey(),
225+
balance_lamports as f64 / lamports_in_sol as f64,
226+
min_keypair_balance_sol
227+
)))
228+
}
229+
}

0 commit comments

Comments
 (0)