Skip to content

Commit 274d92e

Browse files
author
Ivo Georgiev
authored
Merge pull request #273 from AdExNetwork/validator_worker-timeout-errors
Validator worker timeout errors
2 parents 4ee70b7 + ff77822 commit 274d92e

File tree

2 files changed

+37
-22
lines changed

2 files changed

+37
-22
lines changed

validator_worker/src/error.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use primitives::ChannelId;
12
use serde::{Deserialize, Serialize};
23
use std::error::Error;
34
use std::fmt::{Display, Formatter, Result};
@@ -6,15 +7,19 @@ use std::fmt::{Display, Formatter, Result};
67
pub enum ValidatorWorker {
78
Configuration(String),
89
Failed(String),
10+
Channel(ChannelId, String),
911
}
1012

1113
impl Error for ValidatorWorker {}
1214

1315
impl Display for ValidatorWorker {
1416
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
1517
match self {
16-
ValidatorWorker::Configuration(error) => write!(f, "Configuration error: {}", error),
17-
ValidatorWorker::Failed(error) => write!(f, "error: {}", error),
18+
ValidatorWorker::Configuration(err) => write!(f, "Configuration error: {}", err),
19+
ValidatorWorker::Failed(err) => write!(f, "error: {}", err),
20+
ValidatorWorker::Channel(channel_id, err) => {
21+
write!(f, "Channel {}: {}", channel_id, err)
22+
}
1823
}
1924
}
2025
}

validator_worker/src/main.rs

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::error::Error;
66
use std::time::Duration;
77

88
use clap::{App, Arg};
9-
use futures::future::{join, try_join_all};
9+
use futures::future::{join, join_all};
1010
use tokio::runtime::Runtime;
1111
use tokio::time::{delay_for, timeout};
1212

@@ -165,26 +165,26 @@ async fn iterate_channels<A: Adapter + 'static>(args: Args<A>, logger: &Logger)
165165
let channels = match result {
166166
Ok(channels) => channels,
167167
Err(e) => {
168-
error!(logger, "Failed to get channels {}", &e; "main" => "iterate_channels");
168+
error!(logger, "Failed to get channels - {}", &e; "main" => "iterate_channels");
169169
return;
170170
}
171171
};
172172

173173
let channels_size = channels.len();
174174

175-
let tick = try_join_all(
175+
let tick_results = join_all(
176176
channels
177177
.into_iter()
178178
.map(|channel| validator_tick(args.adapter.clone(), channel, &args.config, logger)),
179179
)
180180
.await;
181181

182-
info!(logger, "processed {} channels", channels_size);
183-
184-
if let Err(e) = tick {
185-
error!(logger, "An occurred while processing channels {}", &e; "main" => "iterate_channels");
182+
for channel_err in tick_results.into_iter().filter_map(Result::err) {
183+
error!(logger, "Error processing channels - {}", channel_err; "main" => "iterate_channels");
186184
}
187185

186+
info!(logger, "processed {} channels", channels_size);
187+
188188
if channels_size >= args.config.max_channels as usize {
189189
error!(logger, "WARNING: channel limit cfg.MAX_CHANNELS={} reached", &args.config.max_channels; "main" => "iterate_channels");
190190
}
@@ -203,22 +203,32 @@ async fn validator_tick<A: Adapter + 'static>(
203203

204204
match channel.spec.validators.find(&whoami) {
205205
SpecValidator::Leader(_) => match timeout(duration, leader::tick(&sentry)).await {
206-
Err(e) => return Err(ValidatorWorkerError::Failed(e.to_string())),
207-
Ok(Err(e)) => return Err(ValidatorWorkerError::Failed(e.to_string())),
208-
_ => (),
206+
Err(timeout_e) => Err(ValidatorWorkerError::Channel(
207+
channel.id,
208+
timeout_e.to_string(),
209+
)),
210+
Ok(Err(tick_e)) => Err(ValidatorWorkerError::Channel(
211+
channel.id,
212+
tick_e.to_string(),
213+
)),
214+
_ => Ok(()),
209215
},
210216
SpecValidator::Follower(_) => match timeout(duration, follower::tick(&sentry)).await {
211-
Err(e) => return Err(ValidatorWorkerError::Failed(e.to_string())),
212-
Ok(Err(e)) => return Err(ValidatorWorkerError::Failed(e.to_string())),
213-
_ => (),
217+
Err(timeout_e) => Err(ValidatorWorkerError::Channel(
218+
channel.id,
219+
timeout_e.to_string(),
220+
)),
221+
Ok(Err(tick_e)) => Err(ValidatorWorkerError::Channel(
222+
channel.id,
223+
tick_e.to_string(),
224+
)),
225+
_ => Ok(()),
214226
},
215-
SpecValidator::None => {
216-
return Err(ValidatorWorkerError::Failed(
217-
"validatorTick: processing a channel where we are not validating".to_string(),
218-
))
219-
}
220-
};
221-
Ok(())
227+
SpecValidator::None => Err(ValidatorWorkerError::Channel(
228+
channel.id,
229+
"validatorTick: processing a channel which we are not validating".to_string(),
230+
)),
231+
}
222232
}
223233

224234
fn logger() -> Logger {

0 commit comments

Comments
 (0)