Skip to content

Commit 2324ea6

Browse files
committed
refactor: turn escrow account monitoring into an eventual
1 parent fd32cad commit 2324ea6

File tree

6 files changed

+200
-248
lines changed

6 files changed

+200
-248
lines changed

common/src/escrow_accounts.rs

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
// Copyright 2023-, GraphOps and Semiotic Labs.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use std::{collections::HashMap, time::Duration};
5+
6+
use alloy_primitives::Address;
7+
use anyhow::Result;
8+
use ethers_core::types::U256;
9+
use eventuals::{timer, Eventual, EventualExt};
10+
use log::warn;
11+
use serde::Deserialize;
12+
use serde_json::json;
13+
use tokio::time::sleep;
14+
15+
use crate::prelude::SubgraphClient;
16+
17+
pub fn escrow_accounts(
18+
escrow_subgraph: &'static SubgraphClient,
19+
indexer_address: Address,
20+
interval: Duration,
21+
) -> Eventual<HashMap<Address, U256>> {
22+
// Types for deserializing the network subgraph response
23+
#[derive(Deserialize)]
24+
#[serde(rename_all = "camelCase")]
25+
struct EscrowAccountsResponse {
26+
escrow_accounts: Vec<EscrowAccount>,
27+
}
28+
// These 2 structs are used to deserialize the response from the escrow subgraph.
29+
// Note that U256's serde implementation is based on serializing the internal bytes, not the string decimal
30+
// representation. This is why we deserialize them as strings below.
31+
#[derive(Deserialize)]
32+
#[serde(rename_all = "camelCase")]
33+
struct EscrowAccount {
34+
balance: String,
35+
total_amount_thawing: String,
36+
sender: Sender,
37+
}
38+
#[derive(Deserialize)]
39+
#[serde(rename_all = "camelCase")]
40+
struct Sender {
41+
id: Address,
42+
}
43+
44+
timer(interval).map_with_retry(
45+
move |_| async move {
46+
let response = escrow_subgraph
47+
.query::<EscrowAccountsResponse>(&json!({
48+
"query": r#"
49+
query ($indexer: ID!) {
50+
escrowAccounts(where: {receiver_: {id: $indexer}}) {
51+
balance
52+
totalAmountThawing
53+
sender {
54+
id
55+
}
56+
}
57+
}
58+
"#,
59+
"variables": {
60+
"indexer": indexer_address,
61+
}
62+
}
63+
))
64+
.await
65+
.map_err(|e| e.to_string())?;
66+
67+
// If there are any GraphQL errors returned, we'll log them for debugging
68+
if let Some(errors) = response.errors {
69+
warn!(
70+
"Errors encountered fetching escrow accounts for indexer {:?}: {}",
71+
indexer_address,
72+
errors
73+
.into_iter()
74+
.map(|e| e.message)
75+
.collect::<Vec<_>>()
76+
.join(", ")
77+
);
78+
}
79+
80+
let sender_accounts = response
81+
.data
82+
.map_or(vec![], |data| data.escrow_accounts)
83+
.iter()
84+
.map(|account| {
85+
let balance = U256::checked_sub(
86+
U256::from_dec_str(&account.balance)?,
87+
U256::from_dec_str(&account.total_amount_thawing)?,
88+
)
89+
.unwrap_or_else(|| {
90+
warn!(
91+
"Balance minus total amount thawing underflowed for account {}. \
92+
Setting balance to 0, no queries will be served for this sender.",
93+
account.sender.id
94+
);
95+
U256::from(0)
96+
});
97+
98+
Ok((account.sender.id, balance))
99+
})
100+
.collect::<Result<HashMap<_, _>, anyhow::Error>>()
101+
.map_err(|e| format!("{}", e))?;
102+
103+
Ok(sender_accounts)
104+
},
105+
move |err: String| {
106+
warn!(
107+
"Failed to fetch escrow accounts for indexer {:?}: {}",
108+
indexer_address, err
109+
);
110+
111+
sleep(interval.div_f32(2.0))
112+
},
113+
)
114+
}
115+
116+
#[cfg(test)]
117+
mod tests {
118+
use wiremock::matchers::{method, path};
119+
use wiremock::{Mock, MockServer, ResponseTemplate};
120+
121+
use crate::test_vectors;
122+
123+
use super::*;
124+
125+
#[tokio::test]
126+
async fn test_current_accounts() {
127+
// Set up a mock escrow subgraph
128+
let mock_server = MockServer::start().await;
129+
let escrow_subgraph_endpoint = SubgraphClient::local_deployment_endpoint(
130+
&mock_server.uri(),
131+
&test_vectors::ESCROW_SUBGRAPH_DEPLOYMENT,
132+
)
133+
.unwrap();
134+
let escrow_subgraph = Box::leak(Box::new(
135+
SubgraphClient::new(
136+
Some(&mock_server.uri()),
137+
Some(&test_vectors::ESCROW_SUBGRAPH_DEPLOYMENT),
138+
escrow_subgraph_endpoint.as_ref(),
139+
)
140+
.unwrap(),
141+
));
142+
143+
let mock = Mock::given(method("POST"))
144+
.and(path(format!(
145+
"/subgraphs/id/{}",
146+
*test_vectors::ESCROW_SUBGRAPH_DEPLOYMENT
147+
)))
148+
.respond_with(
149+
ResponseTemplate::new(200)
150+
.set_body_raw(test_vectors::ESCROW_QUERY_RESPONSE, "application/json"),
151+
);
152+
mock_server.register(mock).await;
153+
154+
let accounts = escrow_accounts(
155+
escrow_subgraph,
156+
*test_vectors::INDEXER_ADDRESS,
157+
Duration::from_secs(60),
158+
);
159+
160+
assert_eq!(
161+
accounts.value().await.unwrap(),
162+
*test_vectors::ESCROW_ACCOUNTS
163+
);
164+
}
165+
}

common/src/escrow_monitor.rs

Lines changed: 0 additions & 222 deletions
This file was deleted.

0 commit comments

Comments
 (0)