Skip to content

Commit bbc140f

Browse files
authored
[wormhole-attester] Increase accuracy and improve logging (#653)
* Tiltfile: TILT_DOCKER_REGISTRY def behavior, re-add namespace_create * wormhole_attester/client v5.0.0: accuracy and logging opimisations * [BREAKING CHANGE] min_interval_secs switches to milliseconds under min_interval_ms * attestation jobs no longer use preflight checks - this includes a custom variant of send_and_confirm_transaction(), see util.rs for details * attestation error logging no longer pretty-prints the error structs ({:#?} became {:?})
1 parent 53c9654 commit bbc140f

File tree

8 files changed

+117
-23
lines changed

8 files changed

+117
-23
lines changed

Tiltfile

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ namespace = os.environ.get("TILT_NAMESPACE", "development")
1111
load("ext://namespace", "namespace_create", "namespace_inject")
1212
load("ext://secret", "secret_yaml_generic")
1313

14-
default_registry(image_registry, single_name="development")
14+
namespace_create(namespace)
15+
16+
if image_registry:
17+
default_registry(image_registry, single_name="development")
1518

1619
allow_k8s_contexts(k8s_context())
1720

third_party/pyth/p2w_autoattest.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,11 @@
114114
min_rpc_interval_ms: 0 # RIP RPC
115115
max_batch_jobs: 1000 # Where we're going there's no oomkiller
116116
default_attestation_conditions:
117-
min_interval_secs: 10
117+
min_interval_ms: 10000
118118
symbol_groups:
119119
- group_name: fast_interval_rate_limited
120120
conditions:
121-
min_interval_secs: 1
121+
min_interval_ms: 1000
122122
rate_limit_interval_secs: 2
123123
symbols:
124124
"""
@@ -144,7 +144,7 @@
144144
cfg_yaml += f"""
145145
- group_name: longer_interval_sensitive_changes
146146
conditions:
147-
min_interval_secs: 3
147+
min_interval_ms: 3000
148148
price_changed_bps: 300
149149
symbols:
150150
"""

wormhole_attester/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

wormhole_attester/client/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "pyth-wormhole-attester-client"
3-
version = "4.1.0"
3+
version = "5.0.0"
44
edition = "2018"
55

66
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

wormhole_attester/client/src/attestation_cfg.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -314,8 +314,8 @@ pub const fn default_min_rpc_interval_ms() -> u64 {
314314
150
315315
}
316316

317-
pub const fn default_min_interval_secs() -> u64 {
318-
60
317+
pub const fn default_min_interval_ms() -> u64 {
318+
60_000
319319
}
320320

321321
pub const fn default_rate_limit_interval_secs() -> u32 {
@@ -335,8 +335,8 @@ pub struct AttestationConditions {
335335
/// Lower bound on attestation rate. Attestation is triggered
336336
/// unconditionally whenever the specified interval elapses since
337337
/// last attestation.
338-
#[serde(default = "default_min_interval_secs")]
339-
pub min_interval_secs: u64,
338+
#[serde(default = "default_min_interval_ms")]
339+
pub min_interval_ms: u64,
340340

341341
/// Upper bound on attestation rate. Attesting the same batch
342342
/// before this many seconds pass fails the tx. This limit is
@@ -370,7 +370,7 @@ impl AttestationConditions {
370370
// Bug trap for new fields that also need to be included in
371371
// the returned expression
372372
let AttestationConditions {
373-
min_interval_secs: _min_interval_secs,
373+
min_interval_ms: _min_interval_ms,
374374
max_batch_jobs: _max_batch_jobs,
375375
price_changed_bps,
376376
publish_time_min_delta_secs,
@@ -384,7 +384,7 @@ impl AttestationConditions {
384384
impl Default for AttestationConditions {
385385
fn default() -> Self {
386386
Self {
387-
min_interval_secs: default_min_interval_secs(),
387+
min_interval_ms: default_min_interval_ms(),
388388
max_batch_jobs: default_max_batch_jobs(),
389389
price_changed_bps: None,
390390
publish_time_min_delta_secs: None,
@@ -471,7 +471,7 @@ mod tests {
471471
let fastbois = SymbolGroupConfig {
472472
group_name: "fast bois".to_owned(),
473473
conditions: Some(AttestationConditions {
474-
min_interval_secs: 5,
474+
min_interval_ms: 5,
475475
..Default::default()
476476
}),
477477
symbols: vec![
@@ -489,7 +489,7 @@ mod tests {
489489
let slowbois = SymbolGroupConfig {
490490
group_name: "slow bois".to_owned(),
491491
conditions: Some(AttestationConditions {
492-
min_interval_secs: 200,
492+
min_interval_ms: 200,
493493
..Default::default()
494494
}),
495495
symbols: vec![
@@ -541,7 +541,7 @@ mod tests {
541541
let eth_dup_price_key = Pubkey::new_unique();
542542

543543
let attestation_conditions_1 = AttestationConditions {
544-
min_interval_secs: 5,
544+
min_interval_ms: 5,
545545
..Default::default()
546546
};
547547

@@ -584,7 +584,7 @@ mod tests {
584584
};
585585

586586
let default_attestation_conditions = AttestationConditions {
587-
min_interval_secs: 1,
587+
min_interval_ms: 1,
588588
..Default::default()
589589
};
590590

wormhole_attester/client/src/batch_state.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,11 @@ impl<'a> BatchState {
5050

5151
// min interval
5252
if self.last_job_finished_at.elapsed()
53-
> Duration::from_secs(self.conditions.min_interval_secs)
53+
> Duration::from_millis(self.conditions.min_interval_ms)
5454
{
5555
ret = Some(format!(
5656
"minimum interval of {}s elapsed since last state change",
57-
self.conditions.min_interval_secs
57+
self.conditions.min_interval_ms
5858
));
5959
}
6060

wormhole_attester/client/src/main.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use {
22
pyth_wormhole_attester::error::AttesterCustomError,
3+
pyth_wormhole_attester_client::util::send_and_confirm_transaction_with_config,
4+
solana_client::rpc_config::RpcSendTransactionConfig,
35
solana_program::instruction::InstructionError,
46
solana_sdk::transaction::TransactionError,
57
};
@@ -683,7 +685,11 @@ async fn attestation_job(args: AttestationJobArgs) -> Result<(), ErrBoxSend> {
683685

684686
let tx_processing_start_time = Instant::now();
685687

686-
let sig = match rpc.send_and_confirm_transaction(&tx).await {
688+
let sig = match send_and_confirm_transaction_with_config(&rpc, &tx, RpcSendTransactionConfig {
689+
// Decreases probability of rate limit race conditions
690+
skip_preflight: true,
691+
..Default::default()
692+
}).await {
687693
Ok(s) => Ok(s),
688694
Err(e) => match e.get_transaction_error() {
689695
Some(TransactionError::InstructionError(_idx, InstructionError::Custom(code)))
@@ -750,7 +756,7 @@ async fn attestation_job(args: AttestationJobArgs) -> Result<(), ErrBoxSend> {
750756
.or_else(move |e| async move {
751757
// log any errors coming from the job
752758
warn!(
753-
"Batch {}/{}, group {:?} ERR: {:#?}",
759+
"Batch {}/{}, group {:?} ERR: {:?}",
754760
batch_no4err_msg, batch_count4err_msg, group_name4err_msg, e
755761
);
756762

wormhole_attester/client/src/util.rs

Lines changed: 88 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,20 @@ use {
66
trace,
77
},
88
prometheus::TextEncoder,
9+
solana_client::{
10+
client_error::Result as SolClientResult,
11+
nonblocking::rpc_client::RpcClient,
12+
rpc_config::RpcSendTransactionConfig,
13+
rpc_request::RpcError,
14+
},
15+
solana_sdk::{
16+
commitment_config::CommitmentConfig,
17+
signature::Signature,
18+
transaction::{
19+
uses_durable_nonce,
20+
Transaction,
21+
},
22+
},
923
std::{
1024
net::SocketAddr,
1125
ops::{
@@ -17,9 +31,12 @@ use {
1731
Instant,
1832
},
1933
},
20-
tokio::sync::{
21-
Mutex,
22-
MutexGuard,
34+
tokio::{
35+
sync::{
36+
Mutex,
37+
MutexGuard,
38+
},
39+
time::sleep,
2340
},
2441
warp::{
2542
reply,
@@ -179,3 +196,71 @@ pub async fn start_metrics_server(addr: impl Into<SocketAddr> + 'static) {
179196
.bind(addr)
180197
.await;
181198
}
199+
200+
/// WARNING: Copied verbatim from v1.10.31, be careful when bumping
201+
/// solana crate versions!
202+
///
203+
/// TODO(2023-03-02): Use an upstream method when
204+
/// it's available.
205+
///
206+
/// This method is almost identical to
207+
/// RpcClient::send_and_confirm_transaction(). The only difference is
208+
/// that we let the user specify the config and replace
209+
/// send_transaction() inside with
210+
/// send_transaction_with_config(). This variant is currently missing
211+
/// from solana_client.
212+
pub async fn send_and_confirm_transaction_with_config(
213+
client: &RpcClient,
214+
transaction: &Transaction,
215+
config: RpcSendTransactionConfig,
216+
) -> SolClientResult<Signature> {
217+
const SEND_RETRIES: usize = 1;
218+
const GET_STATUS_RETRIES: usize = usize::MAX;
219+
220+
'sending: for _ in 0..SEND_RETRIES {
221+
let signature = client
222+
.send_transaction_with_config(transaction, config)
223+
.await?;
224+
225+
let recent_blockhash = if uses_durable_nonce(transaction).is_some() {
226+
let (recent_blockhash, ..) = client
227+
.get_latest_blockhash_with_commitment(CommitmentConfig::processed())
228+
.await?;
229+
recent_blockhash
230+
} else {
231+
transaction.message.recent_blockhash
232+
};
233+
234+
for status_retry in 0..GET_STATUS_RETRIES {
235+
match client.get_signature_status(&signature).await? {
236+
Some(Ok(_)) => return Ok(signature),
237+
Some(Err(e)) => return Err(e.into()),
238+
None => {
239+
if !client
240+
.is_blockhash_valid(&recent_blockhash, CommitmentConfig::processed())
241+
.await?
242+
{
243+
// Block hash is not found by some reason
244+
break 'sending;
245+
} else if cfg!(not(test))
246+
// Ignore sleep at last step.
247+
&& status_retry < GET_STATUS_RETRIES
248+
{
249+
// Retry twice a second
250+
sleep(Duration::from_millis(500)).await;
251+
252+
continue;
253+
}
254+
}
255+
}
256+
}
257+
}
258+
259+
Err(RpcError::ForUser(
260+
"unable to confirm transaction. \
261+
This can happen in situations such as transaction expiration \
262+
and insufficient fee-payer funds"
263+
.to_string(),
264+
)
265+
.into())
266+
}

0 commit comments

Comments
 (0)