Skip to content

Commit 160911b

Browse files
authored
Merge pull request #617 from swimos/map_downlink_fix
Resolves incorrect map downlink codec usage.
2 parents e141d14 + 72bec4b commit 160911b

File tree

8 files changed

+223
-175
lines changed

8 files changed

+223
-175
lines changed

client/runtime/src/tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -581,7 +581,7 @@ where
581581
let downlink = TrackingMapDownlink::new(
582582
spawned.clone(),
583583
stopped.clone(),
584-
MapDownlinkModel::new(set_rx, lifecycle, false),
584+
MapDownlinkModel::new(set_rx, lifecycle),
585585
);
586586

587587
let promise = handle

client/swimos_client/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ impl<'h, L> MapDownlinkBuilder<'h, L> {
446446
} = self;
447447

448448
let (tx, rx) = mpsc::channel(downlink_config.buffer_size.get());
449-
let task = DownlinkTask::new(MapDownlinkModel::new(rx, lifecycle, true));
449+
let task = DownlinkTask::new(MapDownlinkModel::new(rx, lifecycle));
450450
let stop_rx = handle
451451
.inner
452452
.run_downlink(path, runtime_config, downlink_config, options, task)

swimos_downlink/src/model/mod.rs

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -64,20 +64,14 @@ impl<T, LC> EventDownlinkModel<T, LC> {
6464
pub struct MapDownlinkModel<K, V, LC> {
6565
pub actions: mpsc::Receiver<MapMessage<K, V>>,
6666
pub lifecycle: LC,
67-
pub remote: bool,
6867
}
6968

7069
impl<K, V, LC> MapDownlinkModel<K, V, LC> {
7170
pub fn new(
7271
actions: mpsc::Receiver<MapMessage<K, V>>,
7372
lifecycle: LC,
74-
remote: bool,
7573
) -> MapDownlinkModel<K, V, LC> {
76-
MapDownlinkModel {
77-
actions,
78-
lifecycle,
79-
remote,
80-
}
74+
MapDownlinkModel { actions, lifecycle }
8175
}
8276
}
8377

@@ -105,9 +99,8 @@ pub fn event_downlink<T>() -> DefaultEventDownlinkModel<T> {
10599

106100
pub fn map_downlink<K, V>(
107101
actions: mpsc::Receiver<MapMessage<K, V>>,
108-
remote: bool,
109102
) -> DefaultMapDownlinkModel<K, V> {
110-
MapDownlinkModel::new(actions, Default::default(), remote)
103+
MapDownlinkModel::new(actions, Default::default())
111104
}
112105

113106
#[derive(Debug)]
@@ -220,16 +213,11 @@ where
220213
F: Fn(LC) -> LC2,
221214
LC2: MapDownlinkLifecycle<K, V>,
222215
{
223-
let MapDownlinkModel {
224-
actions,
225-
lifecycle,
226-
remote,
227-
} = self;
216+
let MapDownlinkModel { actions, lifecycle } = self;
228217

229218
MapDownlinkModel {
230219
actions,
231220
lifecycle: f(lifecycle),
232-
remote,
233221
}
234222
}
235223
}

swimos_downlink/src/task/map.rs

Lines changed: 16 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use swimos_api::downlink::DownlinkConfig;
2323
use swimos_api::error::DownlinkTaskError;
2424
use swimos_api::protocol::downlink::{
2525
DownlinkNotification, DownlinkOperation, DownlinkOperationEncoder, MapNotificationDecoder,
26-
ValueNotificationDecoder,
2726
};
2827
use swimos_api::protocol::map::MapMessage;
2928
use swimos_form::structural::write::StructuralWritable;
@@ -36,7 +35,7 @@ use tokio::select;
3635
use tokio::sync::mpsc;
3736
use tokio_stream::wrappers::ReceiverStream;
3837
use tokio_util::codec::{Decoder, FramedRead, FramedWrite};
39-
use tracing::{info_span, trace, Instrument};
38+
use tracing::{error, info_span, trace, Instrument};
4039

4140
/// Task to drive a map downlink, calling lifecycle events at appropriate points.
4241
///
@@ -60,35 +59,18 @@ where
6059
V::Rec: Send,
6160
LC: MapDownlinkLifecycle<K, V>,
6261
{
63-
let MapDownlinkModel {
64-
actions,
65-
lifecycle,
66-
remote,
67-
} = model;
62+
let MapDownlinkModel { actions, lifecycle } = model;
6863

69-
if remote {
70-
run_io(
71-
config,
72-
input,
73-
lifecycle,
74-
FramedWrite::new(output, DownlinkOperationEncoder),
75-
actions,
76-
ValueNotificationDecoder::default(),
77-
)
78-
.instrument(info_span!("Downlink IO task.", %path))
79-
.await
80-
} else {
81-
run_io(
82-
config,
83-
input,
84-
lifecycle,
85-
FramedWrite::new(output, DownlinkOperationEncoder),
86-
actions,
87-
MapNotificationDecoder::default(),
88-
)
89-
.instrument(info_span!("Downlink IO task.", %path))
90-
.await
91-
}
64+
run_io(
65+
config,
66+
input,
67+
lifecycle,
68+
FramedWrite::new(output, DownlinkOperationEncoder),
69+
actions,
70+
MapNotificationDecoder::default(),
71+
)
72+
.instrument(info_span!("Downlink IO task.", %path))
73+
.await
9274
}
9375

9476
/// The current state of the downlink.
@@ -141,6 +123,7 @@ where
141123
V::Rec: Send,
142124
LC: MapDownlinkLifecycle<K, V>,
143125
Snk: Sink<DownlinkOperation<MapMessage<K, V>>> + Unpin,
126+
Snk::Error: Debug,
144127
D: Decoder<Item = DownlinkNotification<MapMessage<K, V>>, Error = E>,
145128
DownlinkTaskError: From<E>,
146129
E: Debug,
@@ -159,9 +142,7 @@ where
159142
write = (&mut write_fut) => IoEvent::Write(write),
160143
read_event = framed_read.next() => match read_event {
161144
Some(Ok(notification)) => IoEvent::Read(notification),
162-
Some(Err(e)) => {
163-
break Err(e.into())
164-
} ,
145+
Some(Err(e)) => break Err(e.into()),
165146
None => break Ok(()),
166147
}
167148
};
@@ -196,8 +177,8 @@ where
196177
}
197178

198179
let op = DownlinkOperation::new(message);
199-
if framed.feed(op).await.is_err() {
200-
mode = Mode::Read;
180+
if let Err(e) = framed.feed(op).await {
181+
error!(error = ?e, "Failed to feed downlink frame. Transitioning to read-only mode");
201182
}
202183
}
203184
IoEvent::Write(None) => mode = Mode::Read,

swimos_downlink/src/task/tests/event.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use swimos_api::{downlink::DownlinkConfig, protocol::downlink::DownlinkNotificat
2020
use swimos_utilities::non_zero_usize;
2121
use tokio::sync::mpsc;
2222

23-
use super::run_downlink_task;
23+
use super::run_value_downlink_task;
2424
use crate::model::lifecycle::{BasicEventDownlinkLifecycle, EventDownlinkLifecycle};
2525
use crate::{DownlinkTask, EventDownlinkModel};
2626

@@ -69,7 +69,7 @@ async fn link_downlink() {
6969
buffer_size: DEEFAULT_BUFFER_SIZE,
7070
};
7171

72-
let result = run_downlink_task(
72+
let result = run_value_downlink_task(
7373
DownlinkTask::new(model),
7474
config,
7575
|mut writer, reader| async move {
@@ -96,7 +96,7 @@ async fn message_before_linked() {
9696
buffer_size: DEEFAULT_BUFFER_SIZE,
9797
};
9898

99-
let result = run_downlink_task(
99+
let result = run_value_downlink_task(
100100
DownlinkTask::new(model),
101101
config,
102102
|mut writer, reader| async move {
@@ -126,7 +126,7 @@ async fn message_after_linked() {
126126
buffer_size: DEEFAULT_BUFFER_SIZE,
127127
};
128128

129-
let result = run_downlink_task(
129+
let result = run_value_downlink_task(
130130
DownlinkTask::new(model),
131131
config,
132132
|mut writer, reader| async move {
@@ -157,7 +157,7 @@ async fn terminate_after_unlinked() {
157157
buffer_size: DEEFAULT_BUFFER_SIZE,
158158
};
159159

160-
let result = run_downlink_task(
160+
let result = run_value_downlink_task(
161161
DownlinkTask::new(model),
162162
config,
163163
|mut writer, reader| async move {
@@ -197,7 +197,7 @@ async fn terminate_after_corrupt_frame() {
197197
buffer_size: DEEFAULT_BUFFER_SIZE,
198198
};
199199

200-
let result = run_downlink_task(
200+
let result = run_value_downlink_task(
201201
DownlinkTask::new(model),
202202
config,
203203
|mut writer, reader| async move {
@@ -228,7 +228,7 @@ async fn relink_downlink() {
228228
buffer_size: DEEFAULT_BUFFER_SIZE,
229229
};
230230

231-
let result = run_downlink_task(
231+
let result = run_value_downlink_task(
232232
DownlinkTask::new(model),
233233
config,
234234
|mut writer, reader| async move {

0 commit comments

Comments
 (0)