Skip to content

Commit ae2500a

Browse files
Add lots of extra error contexts
1 parent efacc9f commit ae2500a

File tree

4 files changed

+49
-22
lines changed

4 files changed

+49
-22
lines changed

src/archive.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,12 @@ impl Archive {
103103
let header = block.header.as_ref().expect("must have header");
104104

105105
// Save the raw bytes of the block, indexed by its hash
106-
self.save_raw_block(&header.hash, bytes).await?;
106+
self.save_raw_block(&header.hash, bytes)
107+
.await
108+
.context(format!(
109+
"failed to save raw block {}",
110+
hex::encode(&header.hash)
111+
))?;
107112

108113
// Then, save various lookups in dynamodb
109114
let location = block_hash_key(&header.hash);

src/broadcast/broadcast.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::time::{SystemTime, UNIX_EPOCH};
22

3-
use anyhow::Result;
3+
use anyhow::{Context, Result};
44
use aws_sdk_dynamodb::{types::AttributeValue, Client as DynamoClient};
55
use aws_sdk_kinesis::Client as KinesisClient;
66
use aws_sdk_s3::primitives::Blob;
@@ -85,7 +85,8 @@ impl Broadcaster {
8585
.as_millis() as u64;
8686
now < *deadline
8787
})
88-
.await?;
88+
.await
89+
.context("failed checking for deadline")?;
8990

9091
// then send to kinesis, and save the point/seq number back to the destination
9192
let result = self
@@ -95,15 +96,17 @@ impl Broadcaster {
9596
.data(Blob::new(message_bytes.clone()))
9697
.stream_arn(&destination.stream_arn)
9798
.send()
98-
.await?;
99+
.await
100+
.context("failed writing to kinesis")?;
99101
destination
100102
.commit(
101103
&self.dynamo,
102104
&self.table,
103105
message.advance.clone(),
104106
Some(result.sequence_number),
105107
)
106-
.await?;
108+
.await
109+
.context(format!("failed committing destination {}", destination.pk))?;
107110
destinations.push(destination.pk.clone());
108111
} else {
109112
// If the block doesn't apply, we still advance the point
@@ -117,7 +120,11 @@ impl Broadcaster {
117120
message.advance.clone(),
118121
destination.sequence_number.clone(),
119122
)
120-
.await?;
123+
.await
124+
.context(format!(
125+
"failed advancing sequence number for destination {}",
126+
destination.pk
127+
))?;
121128
}
122129
}
123130
Ok(destinations)

src/broadcast/destination.rs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::broadcast::BroadcastMessage;
22

33
use super::filter::FilterConfig;
4-
use anyhow::Result;
4+
use anyhow::{Context, Result};
55
use aws_sdk_dynamodb::{types::AttributeValue, Client as DynamoClient};
66
use aws_sdk_kinesis::{types::ShardIteratorType, Client as KinesisClient};
77
use bytes::Bytes;
@@ -75,7 +75,8 @@ impl Destination {
7575
.expression_attribute_values(":new_point", AttributeValue::S(point_to_string(&point)))
7676
.expression_attribute_values(":rotated_points", AttributeValue::L(recovery_points))
7777
.send()
78-
.await?;
78+
.await
79+
.context(format!("failed to update destination {}", self.pk))?;
7980
Ok(())
8081
}
8182

@@ -107,21 +108,26 @@ impl Destination {
107108
}
108109
let mut iterator = shard_request
109110
.send()
110-
.await?
111+
.await
112+
.context("failed to request shard iterator")?
111113
.shard_iterator
112-
.expect("failed to get shard iterator");
114+
.context("shard iterator is none")?;
113115
info!("Repairing destination {}", self.pk);
114116
loop {
115117
let records = kinesis
116118
.get_records()
117119
.stream_arn(&self.stream_arn)
118120
.shard_iterator(&iterator)
119121
.send()
120-
.await?;
122+
.await
123+
.context(format!(
124+
"failed to fetch records from stream {}",
125+
self.stream_arn
126+
))?;
121127

122128
iterator = records
123129
.next_shard_iterator
124-
.expect("stream should be provisioned");
130+
.context("next shard iterator is none")?;
125131

126132
info!(
127133
"Received {} records, {:?}ms behind tip",
@@ -133,9 +139,12 @@ impl Destination {
133139
let last_record = records.records.into_iter().last().unwrap();
134140
let seq_no = last_record.sequence_number;
135141
let data = last_record.data.into_inner();
136-
let message: BroadcastMessage = serde_json::from_slice(data.as_slice())?;
142+
let message: BroadcastMessage =
143+
serde_json::from_slice(data.as_slice()).context("failed to parse data")?;
137144
let advance = message.advance;
138-
self.commit(&dynamo, &table, advance, Some(seq_no)).await?;
145+
self.commit(&dynamo, &table, advance, Some(seq_no))
146+
.await
147+
.context("failed to commit while repairing")?;
139148
}
140149

141150
if millis_behind_latest == 0 {

src/worker/worker.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::{
1313
broadcast::{BroadcastMessage, Broadcaster},
1414
utils::elapsed,
1515
};
16-
use anyhow::{bail, Result};
16+
use anyhow::{bail, Context, Result};
1717

1818
pub struct Worker {
1919
pub dynamo: DynamoClient,
@@ -36,14 +36,18 @@ impl Worker {
3636
self.kinesis.clone(),
3737
lock_deadline,
3838
)
39-
.await?;
39+
.await
40+
.context("failed to start broadcaster")?;
4041

4142
if broadcaster.destinations.is_empty() {
4243
warn!("No enabled destinations, nothing to do");
4344
return Ok(());
4445
}
4546

46-
broadcaster.repair().await?;
47+
broadcaster
48+
.repair()
49+
.await
50+
.context("failed to repair broadcaster")?;
4751

4852
let earliest_point = broadcaster
4953
.destinations
@@ -66,7 +70,9 @@ impl Worker {
6670
.to_vec()
6771
.encode_hex::<String>()
6872
);
69-
let mut follower = Follower::new(&self.uri, &self.api_key, intersect).await?;
73+
let mut follower = Follower::new(&self.uri, &self.api_key, intersect)
74+
.await
75+
.context("failed to start follower")?;
7076

7177
let mut undo_stack = vec![];
7278

@@ -77,7 +83,7 @@ impl Worker {
7783
bail!("No block in 5 minutes, failing over to another node");
7884
},
7985
result = follower.next_event() => {
80-
let (is_roll_forward, bytes, block, header) = result?;
86+
let (is_roll_forward, bytes, block, header) = result.context("failed to receive next event from follower")?;
8187
let block_hash: String = header.hash.encode_hex();
8288

8389
let start = SystemTime::now();
@@ -86,19 +92,19 @@ impl Worker {
8692
hash: header.hash,
8793
};
8894
if is_roll_forward {
89-
self.archive.save(&block, bytes.to_vec()).await?;
95+
self.archive.save(&block, bytes.to_vec()).await.context(format!("failed to archive {}/{}", point.index, block_hash))?;
9096

9197
let start = SystemTime::now();
9298
let destinations = broadcaster.broadcast(block, BroadcastMessage {
9399
undo: undo_stack.clone(),
94100
advance: point.clone(),
95-
}).await?;
101+
}).await.context(format!("failed to broadcast point {}/{}", point.index, block_hash))?;
96102
trace!("Message broadcast (elapsed={:?})", SystemTime::now().duration_since(start)?);
97103
undo_stack.clear();
98104
info!("Roll forward {}/{} ({})", point.index, block_hash, destinations.join(", "));
99105
} else {
100106
trace!("Unsaving {}/{}", point.index, block_hash);
101-
self.archive.unsave(&block).await?;
107+
self.archive.unsave(&block).await.context(format!("failed to unsave point {}/{}", point.index, block_hash))?;
102108
trace!("Block {}/{} unsaved (elapsed={:?})", point.index, block_hash, elapsed(start));
103109
undo_stack.push(point.clone());
104110
info!("Undo block {}/{}", point.index, block_hash);

0 commit comments

Comments
 (0)