Skip to content

Commit f626f28

Browse files
committed
add: exhausted impl sentry
1 parent d3acacd commit f626f28

File tree

4 files changed

+54
-1
lines changed

4 files changed

+54
-1
lines changed

adapter/src/ethereum.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -666,6 +666,7 @@ mod test {
666666
ad_units: vec![],
667667
pricing_bounds: None,
668668
},
669+
exhausted: Default::default(),
669670
};
670671

671672
// convert to eth channel

primitives/src/channel.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,16 @@ impl SpecValidators {
232232
}
233233
}
234234

235+
pub fn find_index(&self, validator_id: &ValidatorId) -> i32 {
236+
if &self.leader().id == validator_id {
237+
0
238+
} else if &self.follower().id == validator_id {
239+
1
240+
} else {
241+
-1
242+
}
243+
}
244+
235245
pub fn iter(&self) -> Iter<'_> {
236246
Iter::new(&self)
237247
}

sentry/src/db/channel.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,30 @@ pub async fn insert_validator_messages(
9696
.await
9797
}
9898

99+
pub async fn update_exhausted_channel(
100+
pool: &DbPool,
101+
channel: &Channel,
102+
index: i32,
103+
) -> Result<bool, RunError<bb8_postgres::tokio_postgres::Error>> {
104+
pool
105+
.run(move | connection| {
106+
async move {
107+
match connection.prepare("UPDATE channels SET exhausted[$1] = true WHERE id = $2").await {
108+
Ok(stmt) => match connection.execute(&stmt, &[&index, &channel.id]).await {
109+
Ok(row) => {
110+
let updated = row == 1;
111+
Ok((updated, connection))
112+
},
113+
Err(e) => Err((e, connection)),
114+
},
115+
Err(e) => Err((e, connection)),
116+
}
117+
}
118+
})
119+
.await
120+
121+
}
122+
99123
mod list_channels {
100124
use crate::db::DbPool;
101125
use bb8::RunError;

sentry/src/routes/channel.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::db::event_aggregate::{latest_approve_state, latest_heartbeats, latest_new_state};
2-
use crate::db::{get_channel_by_id, insert_channel, insert_validator_messages, list_channels};
2+
use crate::db::{get_channel_by_id, insert_channel, insert_validator_messages, list_channels, update_exhausted_channel};
33
use crate::{success_response, Application, Auth, ResponseError, RouteParams, Session};
44
use bb8::RunError;
55
use bb8_postgres::tokio_postgres::error;
@@ -238,6 +238,13 @@ pub async fn create_validator_messages<A: Adapter + 'static>(
238238
.get("messages")
239239
.ok_or_else(|| ResponseError::BadRequest("missing messages body".to_string()))?;
240240

241+
let channel_is_exhausted = messages.iter().any(|message| match message {
242+
MessageTypes::ApproveState(approve) => approve.exhausted,
243+
MessageTypes::NewState(new_state) => new_state.exhausted,
244+
_ => false,
245+
});
246+
247+
241248
match channel.spec.validators.find(&session.uid) {
242249
None => Err(ResponseError::Unauthorized),
243250
_ => {
@@ -246,6 +253,17 @@ pub async fn create_validator_messages<A: Adapter + 'static>(
246253
}))
247254
.await?;
248255

256+
if channel_is_exhausted {
257+
// can never be -1
258+
let validator_index = channel.spec.validators.find_index(&session.uid);
259+
update_exhausted_channel(
260+
&app.pool,
261+
&channel,
262+
validator_index
263+
).await?;
264+
}
265+
266+
249267
Ok(success_response(serde_json::to_string(&SuccessResponse {
250268
success: true,
251269
})?))

0 commit comments

Comments
 (0)