Skip to content

Commit b389d12

Browse files
author
Ivo Georgiev
authored
Merge pull request #274 from AdExNetwork/dev
Fast forward master to dev: validator worker fixes
2 parents 3ff99e9 + 274d92e commit b389d12

File tree

5 files changed

+76
-47
lines changed

5 files changed

+76
-47
lines changed

adapter/src/ethereum.rs

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,9 @@ impl Adapter for EthereumAdapter {
115115

116116
fn sign(&self, state_root: &str) -> AdapterResult<String> {
117117
if let Some(wallet) = &self.wallet {
118-
let message = Message::from_slice(&hash_message(state_root));
118+
let state_root = hex::decode(state_root)
119+
.map_err(|_| AdapterError::Signature("invalid state_root".to_string()))?;
120+
let message = Message::from_slice(&hash_message(&state_root));
119121
let wallet_sign = wallet
120122
.sign(&self.keystore_pwd, &message)
121123
.map_err(|_| map_error("failed to sign messages"))?;
@@ -130,11 +132,16 @@ impl Adapter for EthereumAdapter {
130132
}
131133

132134
fn verify(&self, signer: &ValidatorId, state_root: &str, sig: &str) -> AdapterResult<bool> {
133-
let decoded_signature = hex::decode(sig)
135+
if !sig.starts_with("0x") {
136+
return Err(AdapterError::Signature("not 0x prefixed hex".to_string()));
137+
}
138+
let decoded_signature = hex::decode(&sig[2..])
134139
.map_err(|_| AdapterError::Signature("invalid signature".to_string()))?;
135140
let address = Address::from_slice(signer.inner());
136141
let signature = Signature::from_electrum(&decoded_signature);
137-
let message = Message::from_slice(&hash_message(state_root));
142+
let state_root = hex::decode(state_root)
143+
.map_err(|_| AdapterError::Signature("invalid state_root".to_string()))?;
144+
let message = Message::from_slice(&hash_message(&state_root));
138145

139146
verify_address(&address, &signature, &message).or_else(|_| Ok(false))
140147
}
@@ -314,14 +321,13 @@ impl RelayerClient {
314321
}
315322
}
316323

317-
fn hash_message(message: &str) -> [u8; 32] {
324+
fn hash_message(message: &[u8]) -> [u8; 32] {
318325
let eth = "\x19Ethereum Signed Message:\n";
319326
let message_length = message.len();
320327

321-
let encoded = format!("{}{}{}", eth, message_length, message);
322-
323328
let mut result = Keccak::new_keccak256();
324-
result.update(&encoded.as_bytes());
329+
result.update(&format!("{}{}", eth, message_length).as_bytes());
330+
result.update(&message);
325331

326332
let mut res: [u8; 32] = [0; 32];
327333
result.finalize(&mut res);
@@ -371,10 +377,9 @@ pub fn ewt_sign(
371377

372378
let payload_encoded =
373379
base64::encode_config(&serde_json::to_string(payload)?, base64::URL_SAFE_NO_PAD);
374-
let message = Message::from_slice(&hash_message(&format!(
375-
"{}.{}",
376-
header_encoded, payload_encoded
377-
)));
380+
let message = Message::from_slice(&hash_message(
381+
&format!("{}.{}", header_encoded, payload_encoded).as_bytes(),
382+
));
378383
let signature: Signature = signer
379384
.sign(password, &message)
380385
.map_err(|_| map_error("sign message"))?
@@ -394,10 +399,9 @@ pub fn ewt_verify(
394399
payload_encoded: &str,
395400
token: &str,
396401
) -> Result<VerifyPayload, Box<dyn Error>> {
397-
let message = Message::from_slice(&hash_message(&format!(
398-
"{}.{}",
399-
header_encoded, payload_encoded
400-
)));
402+
let message = Message::from_slice(&hash_message(
403+
&format!("{}.{}", header_encoded, payload_encoded).as_bytes(),
404+
));
401405

402406
let decoded_signature = base64::decode_config(&token, base64::URL_SAFE_NO_PAD)?;
403407
let signature = Signature::from_electrum(&decoded_signature);
@@ -466,19 +470,19 @@ mod test {
466470

467471
// Sign
468472
let expected_response =
469-
"0xce654de0b3d14d63e1cb3181eee7a7a37ef4a06c9fabc204faf96f26357441b625b1be460fbe8f5278cc02aa88a5d0ac2f238e9e3b8e4893760d33bccf77e47f1b";
473+
"0x625fd46f82c4cfd135ea6a8534e85dbf50beb157046dce59d2e97aacdf4e38381d1513c0e6f002b2f05c05458038b187754ff38cc0658dfc9ba854cccfb6e13e1b";
470474
let message = "2bdeafae53940669daa6f519373f686c";
471475
let response = eth_adapter.sign(message).expect("failed to sign message");
472476
assert_eq!(expected_response, response, "invalid signature");
473477

474478
// Verify
475479
let signature =
476-
"ce654de0b3d14d63e1cb3181eee7a7a37ef4a06c9fabc204faf96f26357441b625b1be460fbe8f5278cc02aa88a5d0ac2f238e9e3b8e4893760d33bccf77e47f1b";
480+
"0x9e07f12958ce7c5eb1362eb9461e4745dd9d74a42b921391393caea700bfbd6e1ad876a7d8f9202ef1fe6110dbfe87840c5676ca5c4fda9f3330694a1ac2a1fc1b";
477481
let verify = eth_adapter
478482
.verify(
479-
&ValidatorId::try_from("2bDeAFAE53940669DaA6F519373f686c1f3d3393")
483+
&ValidatorId::try_from("2892f6C41E0718eeeDd49D98D648C789668cA67d")
480484
.expect("Failed to parse id"),
481-
"2bdeafae53940669daa6f519373f686c",
485+
"8bc45d8eb27f4c98cab35d17b0baecc2a263d6831ef0800f4c190cbfac6d20a3",
482486
&signature,
483487
)
484488
.expect("Failed to verify signatures");

primitives/src/util/tests/prep_db.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::{
2+
channel::{Pricing, PricingBounds},
23
BigNum, Channel, ChannelId, ChannelSpec, EventSubmission, SpecValidators, ValidatorDesc,
34
ValidatorId,
45
};
@@ -69,7 +70,7 @@ lazy_static! {
6970
title: None,
7071
validators: SpecValidators::new(DUMMY_VALIDATOR_LEADER.clone(), DUMMY_VALIDATOR_FOLLOWER.clone()),
7172
max_per_impression: 10.into(),
72-
min_per_impression: 10.into(),
73+
min_per_impression: 1.into(),
7374
targeting: vec![],
7475
min_targeting_score: None,
7576
event_submission: Some(EventSubmission { allow: vec![] }),
@@ -79,7 +80,7 @@ lazy_static! {
7980
nonce: Some(nonce),
8081
withdraw_period_start: Utc.timestamp_millis(4_073_414_400_000),
8182
ad_units: vec![],
82-
pricing_bounds: None,
83+
pricing_bounds: Some(PricingBounds {impression: None, click: Some(Pricing { max: 0.into(), min: 0.into()})}),
8384
},
8485
}
8586
};

sentry/src/main.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
2828
)
2929
.arg(
3030
Arg::with_name("adapter")
31+
.long("adapter")
3132
.short("a")
3233
.help("the adapter for authentication and signing")
3334
.required(true)
@@ -37,18 +38,21 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
3738
)
3839
.arg(
3940
Arg::with_name("keystoreFile")
41+
.long("keystoreFile")
4042
.short("k")
4143
.help("path to the JSON Ethereum Keystore file")
4244
.takes_value(true),
4345
)
4446
.arg(
4547
Arg::with_name("dummyIdentity")
48+
.long("dummyIdentity")
4649
.short("i")
4750
.help("the identity to use with the dummy adapter")
4851
.takes_value(true),
4952
)
5053
.arg(
5154
Arg::with_name("clustered")
55+
.long("clustered")
5256
.short("c")
5357
.help("Run app in cluster mode with multiple workers"),
5458
)

validator_worker/src/error.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use primitives::ChannelId;
12
use serde::{Deserialize, Serialize};
23
use std::error::Error;
34
use std::fmt::{Display, Formatter, Result};
@@ -6,15 +7,19 @@ use std::fmt::{Display, Formatter, Result};
67
pub enum ValidatorWorker {
78
Configuration(String),
89
Failed(String),
10+
Channel(ChannelId, String),
911
}
1012

1113
impl Error for ValidatorWorker {}
1214

1315
impl Display for ValidatorWorker {
1416
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
1517
match self {
16-
ValidatorWorker::Configuration(error) => write!(f, "Configuration error: {}", error),
17-
ValidatorWorker::Failed(error) => write!(f, "error: {}", error),
18+
ValidatorWorker::Configuration(err) => write!(f, "Configuration error: {}", err),
19+
ValidatorWorker::Failed(err) => write!(f, "error: {}", err),
20+
ValidatorWorker::Channel(channel_id, err) => {
21+
write!(f, "Channel {}: {}", channel_id, err)
22+
}
1823
}
1924
}
2025
}

validator_worker/src/main.rs

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::error::Error;
66
use std::time::Duration;
77

88
use clap::{App, Arg};
9-
use futures::future::{join, try_join_all};
9+
use futures::future::{join, join_all};
1010
use tokio::runtime::Runtime;
1111
use tokio::time::{delay_for, timeout};
1212

@@ -36,6 +36,7 @@ fn main() -> Result<(), Box<dyn Error>> {
3636
)
3737
.arg(
3838
Arg::with_name("adapter")
39+
.long("adapter")
3940
.short("a")
4041
.help("the adapter for authentication and signing")
4142
.required(true)
@@ -45,18 +46,21 @@ fn main() -> Result<(), Box<dyn Error>> {
4546
)
4647
.arg(
4748
Arg::with_name("keystoreFile")
49+
.long("keystoreFile")
4850
.short("k")
4951
.help("path to the JSON Ethereum Keystore file")
5052
.takes_value(true),
5153
)
5254
.arg(
5355
Arg::with_name("dummyIdentity")
56+
.long("dummyIdentity")
5457
.short("i")
5558
.help("the identity to use with the dummy adapter")
5659
.takes_value(true),
5760
)
5861
.arg(
5962
Arg::with_name("sentryUrl")
63+
.long("sentryUrl")
6064
.short("u")
6165
.help("the URL to the sentry used for listing channels")
6266
.default_value("http://127.0.0.1:8005")
@@ -65,6 +69,7 @@ fn main() -> Result<(), Box<dyn Error>> {
6569
)
6670
.arg(
6771
Arg::with_name("singleTick")
72+
.long("singleTick")
6873
.short("t")
6974
.takes_value(false)
7075
.help("runs the validator in single-tick mode and exit"),
@@ -160,26 +165,26 @@ async fn iterate_channels<A: Adapter + 'static>(args: Args<A>, logger: &Logger)
160165
let channels = match result {
161166
Ok(channels) => channels,
162167
Err(e) => {
163-
error!(logger, "Failed to get channels {}", &e; "main" => "iterate_channels");
168+
error!(logger, "Failed to get channels - {}", &e; "main" => "iterate_channels");
164169
return;
165170
}
166171
};
167172

168173
let channels_size = channels.len();
169174

170-
let tick = try_join_all(
175+
let tick_results = join_all(
171176
channels
172177
.into_iter()
173178
.map(|channel| validator_tick(args.adapter.clone(), channel, &args.config, logger)),
174179
)
175180
.await;
176181

177-
info!(logger, "processed {} channels", channels_size);
178-
179-
if let Err(e) = tick {
180-
error!(logger, "An occurred while processing channels {}", &e; "main" => "iterate_channels");
182+
for channel_err in tick_results.into_iter().filter_map(Result::err) {
183+
error!(logger, "Error processing channels - {}", channel_err; "main" => "iterate_channels");
181184
}
182185

186+
info!(logger, "processed {} channels", channels_size);
187+
183188
if channels_size >= args.config.max_channels as usize {
184189
error!(logger, "WARNING: channel limit cfg.MAX_CHANNELS={} reached", &args.config.max_channels; "main" => "iterate_channels");
185190
}
@@ -197,23 +202,33 @@ async fn validator_tick<A: Adapter + 'static>(
197202
let duration = Duration::from_millis(config.validator_tick_timeout as u64);
198203

199204
match channel.spec.validators.find(&whoami) {
200-
SpecValidator::Leader(_) => {
201-
if let Err(e) = timeout(duration, leader::tick(&sentry)).await {
202-
return Err(ValidatorWorkerError::Failed(e.to_string()));
203-
}
204-
}
205-
SpecValidator::Follower(_) => {
206-
if let Err(e) = timeout(duration, follower::tick(&sentry)).await {
207-
return Err(ValidatorWorkerError::Failed(e.to_string()));
208-
}
209-
}
210-
SpecValidator::None => {
211-
return Err(ValidatorWorkerError::Failed(
212-
"validatorTick: processing a channel where we are not validating".to_string(),
213-
))
214-
}
215-
};
216-
Ok(())
205+
SpecValidator::Leader(_) => match timeout(duration, leader::tick(&sentry)).await {
206+
Err(timeout_e) => Err(ValidatorWorkerError::Channel(
207+
channel.id,
208+
timeout_e.to_string(),
209+
)),
210+
Ok(Err(tick_e)) => Err(ValidatorWorkerError::Channel(
211+
channel.id,
212+
tick_e.to_string(),
213+
)),
214+
_ => Ok(()),
215+
},
216+
SpecValidator::Follower(_) => match timeout(duration, follower::tick(&sentry)).await {
217+
Err(timeout_e) => Err(ValidatorWorkerError::Channel(
218+
channel.id,
219+
timeout_e.to_string(),
220+
)),
221+
Ok(Err(tick_e)) => Err(ValidatorWorkerError::Channel(
222+
channel.id,
223+
tick_e.to_string(),
224+
)),
225+
_ => Ok(()),
226+
},
227+
SpecValidator::None => Err(ValidatorWorkerError::Channel(
228+
channel.id,
229+
"validatorTick: processing a channel which we are not validating".to_string(),
230+
)),
231+
}
217232
}
218233

219234
fn logger() -> Logger {

0 commit comments

Comments
 (0)