Skip to content

Commit 53a63f7

Browse files
committed
Removes remote argument for map downlinks
1 parent affbec3 commit 53a63f7

File tree

8 files changed

+325
-225
lines changed

8 files changed

+325
-225
lines changed

client/runtime/src/tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -581,7 +581,7 @@ where
581581
let downlink = TrackingMapDownlink::new(
582582
spawned.clone(),
583583
stopped.clone(),
584-
MapDownlinkModel::new(set_rx, lifecycle, true),
584+
MapDownlinkModel::new(set_rx, lifecycle),
585585
);
586586

587587
let promise = handle

client/swimos_client/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ impl<'h, L> MapDownlinkBuilder<'h, L> {
446446
} = self;
447447

448448
let (tx, rx) = mpsc::channel(downlink_config.buffer_size.get());
449-
let task = DownlinkTask::new(MapDownlinkModel::new(rx, lifecycle, true));
449+
let task = DownlinkTask::new(MapDownlinkModel::new(rx, lifecycle));
450450
let stop_rx = handle
451451
.inner
452452
.run_downlink(path, runtime_config, downlink_config, options, task)

swimos_downlink/src/model/mod.rs

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -64,20 +64,14 @@ impl<T, LC> EventDownlinkModel<T, LC> {
6464
pub struct MapDownlinkModel<K, V, LC> {
6565
pub actions: mpsc::Receiver<MapMessage<K, V>>,
6666
pub lifecycle: LC,
67-
pub remote: bool,
6867
}
6968

7069
impl<K, V, LC> MapDownlinkModel<K, V, LC> {
7170
pub fn new(
7271
actions: mpsc::Receiver<MapMessage<K, V>>,
7372
lifecycle: LC,
74-
remote: bool,
7573
) -> MapDownlinkModel<K, V, LC> {
76-
MapDownlinkModel {
77-
actions,
78-
lifecycle,
79-
remote,
80-
}
74+
MapDownlinkModel { actions, lifecycle }
8175
}
8276
}
8377

@@ -105,9 +99,8 @@ pub fn event_downlink<T>() -> DefaultEventDownlinkModel<T> {
10599

106100
pub fn map_downlink<K, V>(
107101
actions: mpsc::Receiver<MapMessage<K, V>>,
108-
remote: bool,
109102
) -> DefaultMapDownlinkModel<K, V> {
110-
MapDownlinkModel::new(actions, Default::default(), remote)
103+
MapDownlinkModel::new(actions, Default::default())
111104
}
112105

113106
#[derive(Debug)]
@@ -220,16 +213,11 @@ where
220213
F: Fn(LC) -> LC2,
221214
LC2: MapDownlinkLifecycle<K, V>,
222215
{
223-
let MapDownlinkModel {
224-
actions,
225-
lifecycle,
226-
remote,
227-
} = self;
216+
let MapDownlinkModel { actions, lifecycle } = self;
228217

229218
MapDownlinkModel {
230219
actions,
231220
lifecycle: f(lifecycle),
232-
remote,
233221
}
234222
}
235223
}

swimos_downlink/src/task/map.rs

Lines changed: 16 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use swimos_api::downlink::DownlinkConfig;
2323
use swimos_api::error::DownlinkTaskError;
2424
use swimos_api::protocol::downlink::{
2525
DownlinkNotification, DownlinkOperation, DownlinkOperationEncoder, MapNotificationDecoder,
26-
ValueNotificationDecoder,
2726
};
2827
use swimos_api::protocol::map::MapMessage;
2928
use swimos_form::structural::write::StructuralWritable;
@@ -36,7 +35,7 @@ use tokio::select;
3635
use tokio::sync::mpsc;
3736
use tokio_stream::wrappers::ReceiverStream;
3837
use tokio_util::codec::{Decoder, FramedRead, FramedWrite};
39-
use tracing::{info_span, trace, Instrument};
38+
use tracing::{error, info_span, trace, Instrument};
4039

4140
/// Task to drive a map downlink, calling lifecycle events at appropriate points.
4241
///
@@ -60,35 +59,18 @@ where
6059
V::Rec: Send,
6160
LC: MapDownlinkLifecycle<K, V>,
6261
{
63-
let MapDownlinkModel {
64-
actions,
65-
lifecycle,
66-
remote,
67-
} = model;
62+
let MapDownlinkModel { actions, lifecycle } = model;
6863

69-
if remote {
70-
run_io(
71-
config,
72-
input,
73-
lifecycle,
74-
FramedWrite::new(output, DownlinkOperationEncoder),
75-
actions,
76-
MapNotificationDecoder::default(),
77-
)
78-
.instrument(info_span!("Downlink IO task.", %path))
79-
.await
80-
} else {
81-
run_io(
82-
config,
83-
input,
84-
lifecycle,
85-
FramedWrite::new(output, DownlinkOperationEncoder),
86-
actions,
87-
ValueNotificationDecoder::default(),
88-
)
89-
.instrument(info_span!("Downlink IO task.", %path))
90-
.await
91-
}
64+
run_io(
65+
config,
66+
input,
67+
lifecycle,
68+
FramedWrite::new(output, DownlinkOperationEncoder),
69+
actions,
70+
MapNotificationDecoder::default(),
71+
)
72+
.instrument(info_span!("Downlink IO task.", %path))
73+
.await
9274
}
9375

9476
/// The current state of the downlink.
@@ -141,6 +123,7 @@ where
141123
V::Rec: Send,
142124
LC: MapDownlinkLifecycle<K, V>,
143125
Snk: Sink<DownlinkOperation<MapMessage<K, V>>> + Unpin,
126+
Snk::Error: Debug,
144127
D: Decoder<Item = DownlinkNotification<MapMessage<K, V>>, Error = E>,
145128
DownlinkTaskError: From<E>,
146129
E: Debug,
@@ -159,9 +142,7 @@ where
159142
write = (&mut write_fut) => IoEvent::Write(write),
160143
read_event = framed_read.next() => match read_event {
161144
Some(Ok(notification)) => IoEvent::Read(notification),
162-
Some(Err(e)) => {
163-
break Err(e.into())
164-
} ,
145+
Some(Err(e)) => break Err(e.into()),
165146
None => break Ok(()),
166147
}
167148
};
@@ -196,8 +177,8 @@ where
196177
}
197178

198179
let op = DownlinkOperation::new(message);
199-
if framed.feed(op).await.is_err() {
200-
mode = Mode::Read;
180+
if let Err(e) = framed.feed(op).await {
181+
error!(error = ?e, "Failed to feed downlink frame. Transitioning to read-only mode");
201182
}
202183
}
203184
IoEvent::Write(None) => mode = Mode::Read,

swimos_downlink/src/task/tests/event.rs

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use swimos_api::{downlink::DownlinkConfig, protocol::downlink::DownlinkNotificat
2020
use swimos_utilities::non_zero_usize;
2121
use tokio::sync::mpsc;
2222

23-
use super::run_downlink_task;
23+
use super::run_value_downlink_task;
2424
use crate::model::lifecycle::{BasicEventDownlinkLifecycle, EventDownlinkLifecycle};
2525
use crate::{DownlinkTask, EventDownlinkModel};
2626

@@ -69,12 +69,14 @@ async fn link_downlink() {
6969
buffer_size: DEEFAULT_BUFFER_SIZE,
7070
};
7171

72-
let result = run_downlink_task(
72+
let result = run_value_downlink_task(
7373
DownlinkTask::new(model),
7474
config,
7575
|mut writer, reader| async move {
7676
let _reader = reader;
77-
writer.send_value::<i32>(DownlinkNotification::Linked).await;
77+
writer
78+
.send_scalar_value::<i32>(DownlinkNotification::Linked)
79+
.await;
7880
expect_event(&mut event_rx, TestMessage::Linked).await;
7981
event_rx
8082
},
@@ -96,15 +98,17 @@ async fn message_before_linked() {
9698
buffer_size: DEEFAULT_BUFFER_SIZE,
9799
};
98100

99-
let result = run_downlink_task(
101+
let result = run_value_downlink_task(
100102
DownlinkTask::new(model),
101103
config,
102104
|mut writer, reader| async move {
103105
let _reader = reader;
104106
writer
105-
.send_value::<i32>(DownlinkNotification::Event { body: 9 })
107+
.send_scalar_value::<i32>(DownlinkNotification::Event { body: 9 })
108+
.await;
109+
writer
110+
.send_scalar_value::<i32>(DownlinkNotification::Linked)
106111
.await;
107-
writer.send_value::<i32>(DownlinkNotification::Linked).await;
108112
expect_event(&mut event_rx, TestMessage::Linked).await;
109113
event_rx
110114
},
@@ -126,14 +130,16 @@ async fn message_after_linked() {
126130
buffer_size: DEEFAULT_BUFFER_SIZE,
127131
};
128132

129-
let result = run_downlink_task(
133+
let result = run_value_downlink_task(
130134
DownlinkTask::new(model),
131135
config,
132136
|mut writer, reader| async move {
133137
let _reader = reader;
134-
writer.send_value::<i32>(DownlinkNotification::Linked).await;
135138
writer
136-
.send_value::<i32>(DownlinkNotification::Event { body: 9 })
139+
.send_scalar_value::<i32>(DownlinkNotification::Linked)
140+
.await;
141+
writer
142+
.send_scalar_value::<i32>(DownlinkNotification::Event { body: 9 })
137143
.await;
138144
expect_event(&mut event_rx, TestMessage::Linked).await;
139145
expect_event(&mut event_rx, TestMessage::Event(9)).await;
@@ -157,16 +163,18 @@ async fn terminate_after_unlinked() {
157163
buffer_size: DEEFAULT_BUFFER_SIZE,
158164
};
159165

160-
let result = run_downlink_task(
166+
let result = run_value_downlink_task(
161167
DownlinkTask::new(model),
162168
config,
163169
|mut writer, reader| async move {
164-
writer.send_value::<i32>(DownlinkNotification::Linked).await;
165170
writer
166-
.send_value::<i32>(DownlinkNotification::Event { body: 9 })
171+
.send_scalar_value::<i32>(DownlinkNotification::Linked)
172+
.await;
173+
writer
174+
.send_scalar_value::<i32>(DownlinkNotification::Event { body: 9 })
167175
.await;
168176
writer
169-
.send_value::<i32>(DownlinkNotification::Unlinked)
177+
.send_scalar_value::<i32>(DownlinkNotification::Unlinked)
170178
.await;
171179
expect_event(&mut event_rx, TestMessage::Linked).await;
172180
expect_event(&mut event_rx, TestMessage::Event(9)).await;
@@ -197,13 +205,15 @@ async fn terminate_after_corrupt_frame() {
197205
buffer_size: DEEFAULT_BUFFER_SIZE,
198206
};
199207

200-
let result = run_downlink_task(
208+
let result = run_value_downlink_task(
201209
DownlinkTask::new(model),
202210
config,
203211
|mut writer, reader| async move {
204-
writer.send_value::<i32>(DownlinkNotification::Linked).await;
212+
writer
213+
.send_scalar_value::<i32>(DownlinkNotification::Linked)
214+
.await;
205215
expect_event(&mut event_rx, TestMessage::Linked).await;
206-
writer.send_corrupted_frame().await;
216+
writer.send_corrupted_scalar_frame().await;
207217
(writer, reader, event_rx)
208218
},
209219
)
@@ -228,24 +238,28 @@ async fn relink_downlink() {
228238
buffer_size: DEEFAULT_BUFFER_SIZE,
229239
};
230240

231-
let result = run_downlink_task(
241+
let result = run_value_downlink_task(
232242
DownlinkTask::new(model),
233243
config,
234244
|mut writer, reader| async move {
235245
let _reader = reader;
236-
writer.send_value::<i32>(DownlinkNotification::Linked).await;
237246
writer
238-
.send_value::<i32>(DownlinkNotification::Event { body: 9 })
247+
.send_scalar_value::<i32>(DownlinkNotification::Linked)
248+
.await;
249+
writer
250+
.send_scalar_value::<i32>(DownlinkNotification::Event { body: 9 })
251+
.await;
252+
writer
253+
.send_scalar_value::<i32>(DownlinkNotification::Unlinked)
239254
.await;
240255
writer
241-
.send_value::<i32>(DownlinkNotification::Unlinked)
256+
.send_scalar_value::<i32>(DownlinkNotification::Event { body: 10 })
242257
.await;
243258
writer
244-
.send_value::<i32>(DownlinkNotification::Event { body: 10 })
259+
.send_scalar_value::<i32>(DownlinkNotification::Linked)
245260
.await;
246-
writer.send_value::<i32>(DownlinkNotification::Linked).await;
247261
writer
248-
.send_value::<i32>(DownlinkNotification::Event { body: 11 })
262+
.send_scalar_value::<i32>(DownlinkNotification::Event { body: 11 })
249263
.await;
250264
expect_event(&mut event_rx, TestMessage::Linked).await;
251265
expect_event(&mut event_rx, TestMessage::Event(9)).await;

0 commit comments

Comments
 (0)