Skip to content

Commit aba5712

Browse files
authored
Merge pull request #550 from AmbireTech/sentry-routes-improvement
Sentry Channel routes improvement
2 parents 32b6b92 + 5262230 commit aba5712

File tree

2 files changed

+69
-65
lines changed

2 files changed

+69
-65
lines changed

sentry/src/db/validator_message.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use chrono::Utc;
22
use tokio_postgres::types::ToSql;
33

44
use primitives::{
5-
balances::UncheckedState,
5+
balances::BalancesState,
66
sentry::{message::MessageResponse, validator_messages::ValidatorMessage},
77
validator::{ApproveState, Heartbeat, MessageType, MessageTypes, NewState},
88
Channel, ChannelId, ValidatorId,
@@ -102,11 +102,11 @@ pub async fn latest_approve_state(
102102
/// Returns the latest [`NewState`] message for this [`Channel`] and the provided `state_root`.
103103
///
104104
/// Ordered by: `received DESC`
105-
pub async fn latest_new_state(
105+
pub async fn latest_new_state<S: BalancesState>(
106106
pool: &DbPool,
107107
channel: &Channel,
108108
state_root: &str,
109-
) -> Result<Option<MessageResponse<NewState<UncheckedState>>>, PoolError> {
109+
) -> Result<Option<MessageResponse<NewState<S>>>, PoolError> {
110110
let client = pool.get().await?;
111111

112112
let select = client.prepare("SELECT \"from\", msg, received FROM validator_messages WHERE channel_id = $1 AND \"from\" = $2 AND msg ->> 'type' = 'NewState' AND msg->> 'stateRoot' = $3 ORDER BY received DESC LIMIT 1").await?;
@@ -115,7 +115,7 @@ pub async fn latest_new_state(
115115
.await?;
116116

117117
rows.get(0)
118-
.map(MessageResponse::<NewState<UncheckedState>>::try_from)
118+
.map(MessageResponse::<NewState<S>>::try_from)
119119
.transpose()
120120
.map_err(PoolError::Backend)
121121
}

sentry/src/routes/channel.rs

Lines changed: 65 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,17 @@ use adapter::{
1313
Adapter, Dummy,
1414
};
1515
use primitives::{
16-
balances::{Balances, CheckedState, UncheckedState},
16+
balances::{Balances, BalancesState, CheckedState, UncheckedState},
1717
merkle_tree::MerkleTree,
1818
sentry::{
1919
channel_list::{ChannelListQuery, ChannelListResponse},
20+
message::MessageResponse,
2021
AccountingResponse, AllSpendersQuery, AllSpendersResponse, ChannelPayRequest,
2122
GetLeafResponse, LastApproved, LastApprovedQuery, LastApprovedResponse, SpenderResponse,
2223
SuccessResponse,
2324
},
2425
spender::{Spendable, Spender},
25-
validator::NewState,
26+
validator::{ApproveState, NewState},
2627
Address, ChainOf, Channel, ChannelId, Deposit, UnifiedNum,
2728
};
2829

@@ -97,17 +98,11 @@ pub async fn last_approved<C: Locked + 'static>(
9798
heartbeats: None,
9899
});
99100

100-
let approve_state = match latest_approve_state(&app.pool, &channel).await? {
101-
Some(approve_state) => approve_state,
102-
None => return Ok(default_response),
103-
};
104-
105-
let state_root = approve_state.msg.state_root.clone();
106-
107-
let new_state = latest_new_state(&app.pool, &channel, &state_root).await?;
108-
if new_state.is_none() {
109-
return Ok(default_response);
110-
}
101+
let (approve_state, new_state) =
102+
match get_corresponding_states::<UncheckedState>(&app.pool, &app.logger, &channel).await? {
103+
Some(states) => states,
104+
None => return Ok(default_response),
105+
};
111106

112107
let validators = vec![channel.leader, channel.follower];
113108
let channel_id = channel.id();
@@ -126,7 +121,7 @@ pub async fn last_approved<C: Locked + 'static>(
126121

127122
Ok(Json(LastApprovedResponse {
128123
last_approved: Some(LastApproved {
129-
new_state,
124+
new_state: Some(new_state),
130125
approve_state: Some(approve_state),
131126
}),
132127
heartbeats,
@@ -193,19 +188,21 @@ pub async fn get_spender_limits<C: Locked + 'static>(
193188
}
194189
};
195190

196-
let new_state = match get_corresponding_new_state(&app.pool, &app.logger, channel).await? {
197-
Some(new_state) => new_state,
198-
None => {
199-
return Ok(Json(SpenderResponse {
200-
spender: Spender {
201-
total_deposited: latest_spendable.deposit.total,
202-
total_spent: None,
203-
},
204-
}))
205-
}
206-
};
191+
let (_, new_state) =
192+
match get_corresponding_states::<CheckedState>(&app.pool, &app.logger, channel).await? {
193+
Some(new_state) => new_state,
194+
None => {
195+
return Ok(Json(SpenderResponse {
196+
spender: Spender {
197+
total_deposited: latest_spendable.deposit.total,
198+
total_spent: None,
199+
},
200+
}))
201+
}
202+
};
207203

208204
let total_spent = new_state
205+
.msg
209206
.balances
210207
.spenders
211208
.get(&spender)
@@ -235,7 +232,8 @@ pub async fn get_all_spender_limits<C: Locked + 'static>(
235232
.checked_mul(limit.into())
236233
.ok_or_else(|| ResponseError::FailedValidation("Page and/or limit is too large".into()))?;
237234

238-
let new_state = get_corresponding_new_state(&app.pool, &app.logger, &channel).await?;
235+
let corresponding_states =
236+
get_corresponding_states::<CheckedState>(&app.pool, &app.logger, &channel).await?;
239237

240238
let mut all_spender_limits: HashMap<Address, Spender> = HashMap::new();
241239

@@ -245,16 +243,20 @@ pub async fn get_all_spender_limits<C: Locked + 'static>(
245243
// Using for loop to avoid async closures
246244
for spendable in all_spendables {
247245
let spender = spendable.spender;
248-
let total_spent = match new_state {
249-
Some(ref new_state) => new_state.balances.spenders.get(&spender).map(|balance| {
250-
spendable
251-
.deposit
252-
.total
253-
.checked_sub(balance)
254-
.unwrap_or_default()
255-
}),
256-
None => None,
257-
};
246+
let total_spent = corresponding_states.as_ref().and_then(|(_, new_state)| {
247+
new_state
248+
.msg
249+
.balances
250+
.spenders
251+
.get(&spender)
252+
.map(|balance| {
253+
spendable
254+
.deposit
255+
.total
256+
.checked_sub(balance)
257+
.unwrap_or_default()
258+
})
259+
});
258260

259261
let spender_info = Spender {
260262
total_deposited: spendable.deposit.total,
@@ -300,8 +302,10 @@ pub async fn add_spender_leaf<C: Locked + 'static>(
300302
}
301303
};
302304

303-
let new_state =
304-
match get_corresponding_new_state(&app.pool, &app.logger, &channel.context).await? {
305+
let (_, new_state) =
306+
match get_corresponding_states::<CheckedState>(&app.pool, &app.logger, &channel.context)
307+
.await?
308+
{
305309
Some(new_state) => new_state,
306310
None => {
307311
return Ok(Json(SpenderResponse {
@@ -313,11 +317,7 @@ pub async fn add_spender_leaf<C: Locked + 'static>(
313317
}
314318
};
315319

316-
let total_spent = new_state
317-
.balances
318-
.spenders
319-
.get(&spender)
320-
.map(|spent| spent.to_owned());
320+
let total_spent = new_state.msg.balances.spenders.get(&spender).copied();
321321

322322
Ok(Json(SpenderResponse {
323323
spender: Spender {
@@ -327,28 +327,38 @@ pub async fn add_spender_leaf<C: Locked + 'static>(
327327
}))
328328
}
329329

330-
async fn get_corresponding_new_state(
330+
/// Retrieves the [`Channel`]'s latest [`ApproveState`] and the corresponding
331+
/// approved [`NewState`].
332+
///
333+
/// # Errors
334+
///
335+
/// - Returns [`ResponseError::NotFound`] if there is no [`ApproveState`]
336+
/// generated for the [`Channel`], i.e. the [`Channel`] is new.
337+
///
338+
/// - Returns [`ResponseError::BadRequest`] if the [`ApproveState`]'s
339+
/// corresponding approve [`NewState`] is not found.
340+
///
341+
/// - Returns [`ResponseError::BadRequest`] if the [`NewState`]'s
342+
/// [`Balances`] don't align, i.e. `sum(earner) != sum(spenders)`
343+
///
344+
/// [`ApproveState`]: primitives::validator::ApproveState
345+
async fn get_corresponding_states<S: BalancesState>(
331346
pool: &DbPool,
332347
logger: &Logger,
333348
channel: &Channel,
334-
) -> Result<Option<NewState<CheckedState>>, ResponseError> {
349+
) -> Result<Option<(MessageResponse<ApproveState>, MessageResponse<NewState<S>>)>, ResponseError> {
335350
let approve_state = match latest_approve_state(pool, channel).await? {
336351
Some(approve_state) => approve_state,
337352
None => return Ok(None),
338353
};
339354

340355
let state_root = approve_state.msg.state_root.clone();
341356

342-
match latest_new_state(pool, channel, &state_root).await? {
343-
Some(new_state) => {
344-
let new_state = new_state.msg.into_inner().try_checked().map_err(|err| {
345-
error!(&logger, "Balances are not aligned in an approved NewState: {}", &err; "module" => "get_spender_limits");
346-
ResponseError::BadRequest("Balances are not aligned in an approved NewState".to_string())
347-
})?;
348-
Ok(Some(new_state))
349-
}
357+
match latest_new_state::<S>(pool, channel, &state_root).await? {
358+
Some(new_state) => Ok(Some((approve_state, new_state))),
350359
None => {
351-
error!(&logger, "{}", "Fatal error! The NewState for the last ApproveState was not found"; "module" => "get_spender_limits");
360+
error!(&logger, "{}", "Fatal error! The NewState for the last ApproveState was not found"; "module" => "routes::channel");
361+
352362
Err(ResponseError::BadRequest(
353363
"Fatal error! The NewState for the last ApproveState was not found".to_string(),
354364
))
@@ -520,16 +530,10 @@ pub async fn get_leaf<C: Locked + 'static>(
520530
) -> Result<Json<GetLeafResponse>, ResponseError> {
521531
let channel = channel_context.context;
522532

523-
let approve_state = latest_approve_state(&app.pool, &channel)
533+
let (_, new_state) = get_corresponding_states::<CheckedState>(&app.pool, &app.logger, &channel)
524534
.await?
525535
.ok_or(ResponseError::NotFound)?;
526536

527-
let state_root = approve_state.msg.state_root.clone();
528-
529-
let new_state = latest_new_state(&app.pool, &channel, &state_root)
530-
.await?
531-
.ok_or_else(|| ResponseError::BadRequest("No NewState message for spender".to_string()))?;
532-
533537
let addr = params.1;
534538

535539
let element = match leaf_for {

0 commit comments

Comments
 (0)