Skip to content

Commit c42fc68

Browse files
committed
Cleaned up downlink failures module.
1 parent 4a0b739 commit c42fc68

File tree

7 files changed

+15
-107
lines changed

7 files changed

+15
-107
lines changed

runtime/swimos_runtime/src/agent/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,6 @@ impl AgentContext for AgentRuntimeContext {
294294
}
295295
}
296296

297-
298297
/// Reasons that a remote connected to an agent runtime task could be disconnected.
299298
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
300299
pub enum DisconnectionReason {

runtime/swimos_runtime/src/agent/task/external_links/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ use tracing::{debug, error, trace};
4545
use uuid::Uuid;
4646

4747
use crate::{
48-
agent::{CommanderKey, CommanderRequest, DownlinkRequest, LinkRequest}, Io,
48+
agent::{CommanderKey, CommanderRequest, DownlinkRequest, LinkRequest},
49+
Io,
4950
};
5051

5152
use super::{AdHocChannelRequest, ExternalLinkRequest};

runtime/swimos_runtime/src/agent/task/external_links/tests.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ use crate::{
1919
task::{external_links, AdHocChannelRequest, ExternalLinkRequest},
2020
CommanderKey, CommanderRequest, DownlinkRequest, LinkRequest,
2121
},
22-
downlink::DownlinkOptions, Io,
22+
downlink::DownlinkOptions,
23+
Io,
2324
};
2425
use bytes::Bytes;
2526
use futures::{

runtime/swimos_runtime/src/downlink/failure.rs

Lines changed: 5 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,9 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::{convert::Infallible, fmt::Display, num::NonZeroUsize};
16-
17-
use swimos_utilities::format::comma_sep;
18-
use thiserror::Error;
19-
use tracing::warn;
15+
use std::{convert::Infallible, fmt::Display};
2016

17+
/// Configuration for the downlink runtime to instruct it how to respond to bad envelopes.
2118
#[derive(Debug, PartialEq, Eq)]
2219
pub enum BadFrameResponse<E> {
2320
/// Instruction to ignore the bad envelope.
@@ -63,6 +60,7 @@ impl<E: std::error::Error> BadFrameStrategy<E> for AlwaysAbortStrategy {
6360
}
6461
}
6562

63+
/// Error type returned by [`ReportStrategy`].
6664
#[derive(Debug)]
6765
pub struct ErrReport {
6866
message: String,
@@ -84,6 +82,7 @@ impl ErrReport {
8482
}
8583
}
8684

85+
/// [`BadFrameStrategy`] that creates an [`ErrReport`] from the error returned by the inner strategy.
8786
pub struct ReportStrategy<S> {
8887
inner: S,
8988
}
@@ -117,6 +116,7 @@ where
117116
}
118117
}
119118

119+
#[doc(hidden)]
120120
pub type BoxedReportStrategy<'a, E> = Box<dyn BadFrameStrategy<E, Report = ErrReport> + Send + 'a>;
121121

122122
impl<'a, E> BadFrameStrategy<E> for BoxedReportStrategy<'a, E> {
@@ -139,76 +139,10 @@ impl<E> BadFrameStrategy<E> for AlwaysIgnoreStrategy {
139139
}
140140
}
141141

142-
/// A strategy that will ignore several bad envelopes, collecting the errors, and then
143-
/// abort.
144-
pub struct CountStrategy<E> {
145-
max: usize,
146-
count: usize,
147-
errors: Vec<E>,
148-
}
149-
150-
impl<E> CountStrategy<E> {
151-
pub fn new(max: NonZeroUsize) -> Self {
152-
CountStrategy {
153-
max: max.get(),
154-
count: 0,
155-
errors: vec![],
156-
}
157-
}
158-
}
159-
160-
/// A collection of errors, accumulated by a [`CountStrategy`].
161-
#[derive(Debug, Error, PartialEq, Eq)]
162-
#[error("Too many bad frames: {errors}")]
163-
pub struct ErrorLog<E> {
164-
errors: Errors<E>,
165-
}
166-
167-
impl<E> AsRef<[E]> for ErrorLog<E> {
168-
fn as_ref(&self) -> &[E] {
169-
&self.errors.0
170-
}
171-
}
172-
173-
#[derive(Debug, PartialEq, Eq)]
174-
pub struct Errors<E>(Vec<E>);
175-
176-
impl<E: Display> Display for Errors<E> {
177-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178-
write!(f, "[{}]", comma_sep(&self.0))
179-
}
180-
}
181-
182-
impl<E: std::error::Error> BadFrameStrategy<E> for CountStrategy<E> {
183-
type Report = ErrorLog<E>;
184-
185-
fn failed_with(&mut self, error: E) -> BadFrameResponse<Self::Report> {
186-
let CountStrategy { max, count, errors } = self;
187-
*count += 1;
188-
if *count == *max {
189-
errors.push(error);
190-
*count = 0;
191-
BadFrameResponse::Abort(ErrorLog {
192-
errors: Errors(std::mem::take(errors)),
193-
})
194-
} else {
195-
warn!(
196-
"Received {n} of a maximum of {m} invalid map messages: {e}",
197-
n = *count,
198-
m = *max,
199-
e = error
200-
);
201-
errors.push(error);
202-
BadFrameResponse::Ignore
203-
}
204-
}
205-
}
206-
207142
#[cfg(test)]
208143
mod tests {
209144

210145
use super::*;
211-
use swimos_utilities::non_zero_usize;
212146
use thiserror::Error;
213147

214148
#[derive(Debug, Error, PartialEq, Eq)]
@@ -231,34 +165,4 @@ mod tests {
231165
let response = handler.failed_with(TestError("failed".to_string()));
232166
assert_eq!(response, BadFrameResponse::Ignore);
233167
}
234-
235-
const MAX: NonZeroUsize = non_zero_usize!(3);
236-
237-
#[test]
238-
fn count_errors() {
239-
let mut handler = CountStrategy::new(MAX);
240-
241-
let first = handler.failed_with(TestError("first".to_string()));
242-
assert_eq!(BadFrameResponse::Ignore, first);
243-
244-
let second = handler.failed_with(TestError("second".to_string()));
245-
assert_eq!(BadFrameResponse::Ignore, second);
246-
247-
let third = handler.failed_with(TestError("third".to_string()));
248-
match third {
249-
BadFrameResponse::Abort(report) => {
250-
assert_eq!(
251-
report.as_ref(),
252-
&[
253-
TestError("first".to_string()),
254-
TestError("second".to_string()),
255-
TestError("third".to_string())
256-
]
257-
)
258-
}
259-
BadFrameResponse::Ignore => {
260-
panic!("Error ignored.");
261-
}
262-
}
263-
}
264168
}

runtime/swimos_runtime/src/downlink/interpretation/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ impl DownlinkInterpretation for MapInterpretation {
9494
}
9595
}
9696

97+
/// Interpretation for map downlinks that does not interpret the events at all and passes through
98+
/// the frame unmodified.
9799
pub struct NoInterpretation;
98100
impl DownlinkInterpretation for NoInterpretation {
99101
type Error = Infallible;

runtime/swimos_runtime/src/downlink/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@ use bytes::{Bytes, BytesMut};
2525
use futures::future::{join, select, Either};
2626
use futures::stream::SelectAll;
2727
use futures::{Future, FutureExt, Sink, SinkExt, Stream, StreamExt};
28-
pub use interpretation::MapInterpretation;
29-
pub use interpretation::NoInterpretation;
28+
use interpretation::MapInterpretation;
3029
use swimos_agent_protocol::{
3130
encoding::downlink::DownlinkNotificationEncoder, DownlinkNotification,
3231
};
@@ -47,6 +46,8 @@ use tokio_util::codec::{Decoder, Encoder, FramedRead, FramedWrite};
4746
use tracing::{error, info, info_span, trace, warn, Instrument};
4847
use uuid::Uuid;
4948

49+
pub use interpretation::NoInterpretation;
50+
5051
mod backpressure;
5152
pub mod failure;
5253
mod interpretation;

runtime/swimos_runtime/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,4 @@ pub mod downlink;
2828
mod timeout_coord;
2929

3030
/// Ends of two independent channels (for example the input and output channels of an agent).
31-
type Io = (ByteWriter, ByteReader);
31+
type Io = (ByteWriter, ByteReader);

0 commit comments

Comments
 (0)