Skip to content

Commit 3a17267

Browse files
authored
Merge pull request #354 from samparsky/add-exhausted-property
Add exhausted property
2 parents bc4308b + 4d8e2e4 commit 3a17267

File tree

11 files changed

+76
-3
lines changed

11 files changed

+76
-3
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ Requirements:
2222
- Docker
2323

2424
#### Linux
25+
2526
- `build-essentials` is required to build the project (error: `linker ``cc`` not found`)
2627
- The crate `openssl-sys` requires `libssl-dev` and `pkg-config` for Ubuntu.
2728

adapter/src/ethereum.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -679,6 +679,7 @@ mod test {
679679
ad_units: vec![],
680680
pricing_bounds: None,
681681
},
682+
exhausted: Default::default(),
682683
};
683684

684685
// convert to eth channel

hooks/post_checkout

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#!/bin/bash
2+
# Docker hub does a recursive clone, then checks the branch out,
3+
# so when a PR adds a submodule (or updates it), it fails.
4+
git submodule update --init

primitives/src/channel.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,12 @@ pub struct Channel {
9898
#[serde(default)]
9999
pub targeting_rules: Rules,
100100
pub spec: ChannelSpec,
101+
#[serde(default)]
102+
pub exhausted: Vec<bool>,
103+
}
104+
105+
pub fn channel_exhausted(channel: &Channel) -> bool {
106+
channel.exhausted.len() == 2 && channel.exhausted.iter().all(|&x| x)
101107
}
102108

103109
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
@@ -226,6 +232,16 @@ impl SpecValidators {
226232
}
227233
}
228234

235+
pub fn find_index(&self, validator_id: &ValidatorId) -> Option<i32> {
236+
if &self.leader().id == validator_id {
237+
Some(0)
238+
} else if &self.follower().id == validator_id {
239+
Some(1)
240+
} else {
241+
None
242+
}
243+
}
244+
229245
pub fn iter(&self) -> Iter<'_> {
230246
Iter::new(&self)
231247
}
@@ -387,6 +403,7 @@ pub mod postgres {
387403
valid_until: row.get("valid_until"),
388404
targeting_rules: row.get::<_, Json<Rules>>("targeting_rules").0,
389405
spec: row.get::<_, Json<ChannelSpec>>("spec").0,
406+
exhausted: row.get("exhausted"),
390407
}
391408
}
392409
}

primitives/src/util/tests/prep_db.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ lazy_static! {
8383
ad_units: vec![],
8484
pricing_bounds: Some(PricingBounds {impression: None, click: Some(Pricing { max: 0.into(), min: 0.into()})}),
8585
},
86+
exhausted: Default::default(),
8687
}
8788
};
8889

primitives/src/validator.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,8 @@ pub struct ApproveState {
165165
pub state_root: String,
166166
pub signature: String,
167167
pub is_healthy: bool,
168+
#[serde(default)]
169+
pub exhausted: bool,
168170
}
169171

170172
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
@@ -173,6 +175,8 @@ pub struct NewState {
173175
pub state_root: String,
174176
pub signature: String,
175177
pub balances: BalancesMap,
178+
#[serde(default)]
179+
pub exhausted: bool,
176180
}
177181

178182
#[derive(Default, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]

sentry/migrations/20190806011140_initial-tables/up.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ CREATE TABLE channels
66
deposit_amount VARCHAR(255) NOT NULL,
77
valid_until TIMESTAMP(2) WITH TIME ZONE NOT NULL,
88
spec JSONB NOT NULL,
9+
exhausted BOOLEAN[2]
910

1011
PRIMARY KEY (id)
1112
);

sentry/src/db/channel.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,29 @@ pub async fn insert_validator_messages(
9696
.await
9797
}
9898

99+
pub async fn update_exhausted_channel(
100+
pool: &DbPool,
101+
channel: &Channel,
102+
index: i32,
103+
) -> Result<bool, RunError<bb8_postgres::tokio_postgres::Error>> {
104+
pool.run(move |connection| async move {
105+
match connection
106+
.prepare("UPDATE channels SET exhausted[$1] = true WHERE id = $2")
107+
.await
108+
{
109+
Ok(stmt) => match connection.execute(&stmt, &[&index, &channel.id]).await {
110+
Ok(row) => {
111+
let updated = row == 1;
112+
Ok((updated, connection))
113+
}
114+
Err(e) => Err((e, connection)),
115+
},
116+
Err(e) => Err((e, connection)),
117+
}
118+
})
119+
.await
120+
}
121+
99122
mod list_channels {
100123
use crate::db::DbPool;
101124
use bb8::RunError;

sentry/src/routes/channel.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use crate::db::event_aggregate::{latest_approve_state, latest_heartbeats, latest_new_state};
2-
use crate::db::{get_channel_by_id, insert_channel, insert_validator_messages, list_channels};
2+
use crate::db::{
3+
get_channel_by_id, insert_channel, insert_validator_messages, list_channels,
4+
update_exhausted_channel,
5+
};
36
use crate::{success_response, Application, Auth, ResponseError, RouteParams, Session};
47
use bb8::RunError;
58
use bb8_postgres::tokio_postgres::error;
@@ -238,6 +241,12 @@ pub async fn create_validator_messages<A: Adapter + 'static>(
238241
.get("messages")
239242
.ok_or_else(|| ResponseError::BadRequest("missing messages body".to_string()))?;
240243

244+
let channel_is_exhausted = messages.iter().any(|message| match message {
245+
MessageTypes::ApproveState(approve) => approve.exhausted,
246+
MessageTypes::NewState(new_state) => new_state.exhausted,
247+
_ => false,
248+
});
249+
241250
match channel.spec.validators.find(&session.uid) {
242251
None => Err(ResponseError::Unauthorized),
243252
_ => {
@@ -246,6 +255,12 @@ pub async fn create_validator_messages<A: Adapter + 'static>(
246255
}))
247256
.await?;
248257

258+
if channel_is_exhausted {
259+
if let Some(validator_index) = channel.spec.validators.find_index(&session.uid) {
260+
update_exhausted_channel(&app.pool, &channel, validator_index).await?;
261+
}
262+
}
263+
249264
Ok(success_response(serde_json::to_string(&SuccessResponse {
250265
success: true,
251266
})?))

validator_worker/src/follower.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::fmt;
33

44
use primitives::adapter::{Adapter, AdapterErrorKind};
55
use primitives::validator::{ApproveState, MessageTypes, NewState, RejectState};
6-
use primitives::BalancesMap;
6+
use primitives::{BalancesMap, BigNum};
77

88
use crate::core::follower_rules::{get_health, is_valid_transition};
99
use crate::heartbeat::{heartbeat, HeartbeatStatus};
@@ -140,12 +140,14 @@ async fn on_new_state<'a, A: Adapter + 'static>(
140140
let signature = iface.adapter.sign(&new_state.state_root)?;
141141
let health_threshold = u64::from(iface.config.health_threshold_promilles);
142142
let is_healthy = health >= health_threshold;
143+
let exhausted = proposed_balances.values().sum::<BigNum>() == iface.channel.deposit_amount;
143144

144145
let propagation_result = iface
145146
.propagate(&[&MessageTypes::ApproveState(ApproveState {
146147
state_root: proposed_state_root,
147148
signature,
148149
is_healthy,
150+
exhausted,
149151
})])
150152
.await;
151153

0 commit comments

Comments
 (0)