1
1
// Copyright 2023-, GraphOps and Semiotic Labs.
2
2
// SPDX-License-Identifier: Apache-2.0
3
3
4
+ use std:: collections:: HashMap ;
5
+ use std:: sync:: Arc ;
6
+
4
7
use alloy_primitives:: Address ;
5
8
use anyhow:: Result ;
6
- use ethereum_types :: U256 ;
7
- use log:: { error, info} ;
9
+ use ethers_core :: types :: U256 ;
10
+ use log:: { error, info, warn } ;
8
11
use serde:: Deserialize ;
9
12
use serde_json:: json;
10
- use std:: collections:: HashMap ;
11
- use std:: sync:: Arc ;
12
13
use tokio:: sync:: RwLock ;
13
- use toolshed:: thegraph:: DeploymentId ;
14
14
15
- use crate :: graph_node :: GraphNodeInstance ;
15
+ use crate :: prelude :: SubgraphClient ;
16
16
17
17
#[ derive( Debug ) ]
18
18
struct EscrowMonitorInner {
19
- graph_node : GraphNodeInstance ,
20
- escrow_subgraph_deployment : DeploymentId ,
19
+ escrow_subgraph : & ' static SubgraphClient ,
21
20
indexer_address : Address ,
22
21
interval_ms : u64 ,
23
22
sender_accounts : Arc < RwLock < HashMap < Address , U256 > > > ,
@@ -33,16 +32,14 @@ pub struct EscrowMonitor {
33
32
#[ cfg_attr( test, faux:: methods) ]
34
33
impl EscrowMonitor {
35
34
pub async fn new (
36
- graph_node : GraphNodeInstance ,
37
- escrow_subgraph_deployment : DeploymentId ,
35
+ escrow_subgraph : & ' static SubgraphClient ,
38
36
indexer_address : Address ,
39
37
interval_ms : u64 ,
40
38
) -> Result < Self > {
41
39
let sender_accounts = Arc :: new ( RwLock :: new ( HashMap :: new ( ) ) ) ;
42
40
43
41
let inner = Arc :: new ( EscrowMonitorInner {
44
- graph_node,
45
- escrow_subgraph_deployment,
42
+ escrow_subgraph,
46
43
indexer_address,
47
44
interval_ms,
48
45
sender_accounts,
@@ -61,30 +58,34 @@ impl EscrowMonitor {
61
58
}
62
59
63
60
async fn current_accounts (
64
- graph_node : & GraphNodeInstance ,
65
- escrow_subgraph_deployment : & DeploymentId ,
61
+ escrow_subgraph : & ' static SubgraphClient ,
66
62
indexer_address : & Address ,
67
63
) -> Result < HashMap < Address , U256 > > {
64
+ // Types for deserializing the network subgraph response
65
+ #[ derive( Deserialize ) ]
66
+ #[ serde( rename_all = "camelCase" ) ]
67
+ struct EscrowAccountsResponse {
68
+ escrow_accounts : Vec < EscrowAccount > ,
69
+ }
68
70
// These 2 structs are used to deserialize the response from the escrow subgraph.
69
71
// Note that U256's serde implementation is based on serializing the internal bytes, not the string decimal
70
72
// representation. This is why we deserialize them as strings below.
71
73
#[ derive( Deserialize ) ]
72
- struct _Sender {
73
- id : Address ,
74
- }
75
- #[ derive( Deserialize ) ]
76
- struct _EscrowAccount {
74
+ #[ serde( rename_all = "camelCase" ) ]
75
+ struct EscrowAccount {
77
76
balance : String ,
78
- #[ serde( rename = "totalAmountThawing" ) ]
79
77
total_amount_thawing : String ,
80
- sender : _Sender ,
78
+ sender : Sender ,
79
+ }
80
+ #[ derive( Deserialize ) ]
81
+ #[ serde( rename_all = "camelCase" ) ]
82
+ struct Sender {
83
+ id : Address ,
81
84
}
82
85
83
- let res = graph_node
84
- . subgraph_query_raw (
85
- escrow_subgraph_deployment,
86
- serde_json:: to_string ( & json ! ( {
87
- "query" : r#"
86
+ let response = escrow_subgraph
87
+ . query :: < EscrowAccountsResponse > ( & json ! ( {
88
+ "query" : r#"
88
89
query ($indexer: ID!) {
89
90
escrowAccounts(where: {receiver_: {id: $indexer}}) {
90
91
balance
@@ -95,60 +96,54 @@ impl EscrowMonitor {
95
96
}
96
97
}
97
98
"# ,
98
- "variables" : {
99
- "indexer" : indexer_address,
100
- }
101
- }
102
- ) )
103
- . expect ( "serialize escrow GraphQL query" ) ,
104
- )
99
+ "variables" : {
100
+ "indexer" : indexer_address,
101
+ }
102
+ }
103
+ ) )
105
104
. await ?;
106
105
107
- let mut res_json: serde_json:: Value = serde_json:: from_str ( res. graphql_response . as_str ( ) )
108
- . map_err ( |e| {
109
- anyhow:: anyhow!(
110
- "Failed to fetch current accounts from escrow subgraph: {}" ,
111
- e
112
- )
113
- } ) ?;
106
+ // If there are any GraphQL errors returned, we'll log them for debugging
107
+ if let Some ( errors) = response. errors {
108
+ warn ! (
109
+ "Errors encountered fetching escrow accounts for indexer {:?}: {}" ,
110
+ indexer_address,
111
+ errors
112
+ . into_iter( )
113
+ . map( |e| e. message)
114
+ . collect:: <Vec <_>>( )
115
+ . join( ", " )
116
+ ) ;
117
+ }
114
118
115
- let escrow_accounts: Vec < _EscrowAccount > =
116
- serde_json:: from_value ( res_json[ "data" ] [ "escrowAccounts" ] . take ( ) ) . map_err ( |e| {
117
- anyhow:: anyhow!(
118
- "Failed to parse current accounts response from escrow subgraph: {}" ,
119
- e
119
+ let sender_accounts = response
120
+ . data
121
+ . map_or ( vec ! [ ] , |data| data. escrow_accounts )
122
+ . iter ( )
123
+ . map ( |account| {
124
+ let balance = U256 :: checked_sub (
125
+ U256 :: from_str_radix ( & account. balance , 16 ) ?,
126
+ U256 :: from_str_radix ( & account. total_amount_thawing , 16 ) ?,
120
127
)
121
- } ) ?;
122
-
123
- let mut sender_accounts: HashMap < Address , U256 > = HashMap :: new ( ) ;
124
-
125
- for account in escrow_accounts {
126
- let balance = U256 :: checked_sub (
127
- U256 :: from_dec_str ( & account. balance ) ?,
128
- U256 :: from_dec_str ( & account. total_amount_thawing ) ?,
129
- )
130
- . unwrap_or_else ( || {
131
- error ! (
132
- "Balance minus total amount thawing underflowed for account {}. Setting balance to 0, no queries \
133
- will be served for this sender.",
134
- account. sender. id
135
- ) ;
136
- U256 :: from ( 0 )
137
- } ) ;
138
-
139
- sender_accounts. insert ( account. sender . id , balance) ;
140
- }
128
+ . unwrap_or_else ( || {
129
+ warn ! (
130
+ "Balance minus total amount thawing underflowed for account {}. \
131
+ Setting balance to 0, no queries will be served for this sender.",
132
+ account. sender. id
133
+ ) ;
134
+ U256 :: from ( 0 )
135
+ } ) ;
136
+
137
+ Ok ( ( account. sender . id , balance) )
138
+ } )
139
+ . collect :: < Result < HashMap < _ , _ > , anyhow:: Error > > ( ) ?;
141
140
142
141
Ok ( sender_accounts)
143
142
}
144
143
145
144
async fn update_accounts ( inner : & Arc < EscrowMonitorInner > ) -> Result < ( ) , anyhow:: Error > {
146
- * ( inner. sender_accounts . write ( ) . await ) = Self :: current_accounts (
147
- & inner. graph_node ,
148
- & inner. escrow_subgraph_deployment ,
149
- & inner. indexer_address ,
150
- )
151
- . await ?;
145
+ * ( inner. sender_accounts . write ( ) . await ) =
146
+ Self :: current_accounts ( inner. escrow_subgraph , & inner. indexer_address ) . await ?;
152
147
Ok ( ( ) )
153
148
}
154
149
@@ -181,50 +176,47 @@ impl EscrowMonitor {
181
176
182
177
#[ cfg( test) ]
183
178
mod tests {
184
- use std:: str:: FromStr ;
185
-
186
179
use wiremock:: matchers:: { method, path} ;
187
180
use wiremock:: { Mock , MockServer , ResponseTemplate } ;
188
181
189
- use crate :: { graph_node, test_vectors} ;
182
+ use crate :: test_vectors;
183
+ use crate :: test_vectors:: { ESCROW_SUBGRAPH_DEPLOYMENT , INDEXER_ADDRESS } ;
190
184
191
185
use super :: * ;
192
186
193
187
#[ tokio:: test]
194
188
async fn test_current_accounts ( ) {
195
- let indexer_address = Address :: from_str ( test_vectors:: INDEXER_ADDRESS ) . unwrap ( ) ;
196
- let escrow_subgraph_deployment =
197
- DeploymentId :: from_str ( "Qmb5Ysp5oCUXhLA8NmxmYKDAX2nCMnh7Vvb5uffb9n5vss" ) . unwrap ( ) ;
198
-
189
+ // Set up a mock escrow subgraph
199
190
let mock_server = MockServer :: start ( ) . await ;
200
- let graph_node = graph_node:: GraphNodeInstance :: new ( & mock_server. uri ( ) ) ;
191
+ let escrow_subgraph_endpoint = SubgraphClient :: local_deployment_endpoint (
192
+ & mock_server. uri ( ) ,
193
+ & test_vectors:: ESCROW_SUBGRAPH_DEPLOYMENT ,
194
+ )
195
+ . unwrap ( ) ;
196
+ let escrow_subgraph = Box :: leak ( Box :: new (
197
+ SubgraphClient :: new (
198
+ Some ( & mock_server. uri ( ) ) ,
199
+ Some ( & test_vectors:: ESCROW_SUBGRAPH_DEPLOYMENT ) ,
200
+ escrow_subgraph_endpoint. as_ref ( ) ,
201
+ )
202
+ . unwrap ( ) ,
203
+ ) ) ;
201
204
202
205
let mock = Mock :: given ( method ( "POST" ) )
203
- . and ( path (
204
- "/subgraphs/id/" . to_string ( ) + & escrow_subgraph_deployment. to_string ( ) ,
205
- ) )
206
+ . and ( path ( format ! (
207
+ "/subgraphs/id/{}" ,
208
+ * ESCROW_SUBGRAPH_DEPLOYMENT
209
+ ) ) )
206
210
. respond_with (
207
211
ResponseTemplate :: new ( 200 )
208
212
. set_body_raw ( test_vectors:: ESCROW_QUERY_RESPONSE , "application/json" ) ,
209
213
) ;
210
214
mock_server. register ( mock) . await ;
211
215
212
- let inner = EscrowMonitorInner {
213
- graph_node,
214
- escrow_subgraph_deployment,
215
- indexer_address,
216
- interval_ms : 1000 ,
217
- sender_accounts : Arc :: new ( RwLock :: new ( HashMap :: new ( ) ) ) ,
218
- } ;
219
-
220
- let accounts = EscrowMonitor :: current_accounts (
221
- & inner. graph_node ,
222
- & inner. escrow_subgraph_deployment ,
223
- & inner. indexer_address ,
224
- )
225
- . await
226
- . unwrap ( ) ;
216
+ let accounts = EscrowMonitor :: current_accounts ( escrow_subgraph, & INDEXER_ADDRESS )
217
+ . await
218
+ . unwrap ( ) ;
227
219
228
- assert_eq ! ( accounts, test_vectors:: expected_escrow_accounts ( ) ) ;
220
+ assert_eq ! ( accounts, * test_vectors:: ESCROW_ACCOUNTS ) ;
229
221
}
230
222
}
0 commit comments