Skip to content

Commit 8f7842c

Browse files
committed
Resolves PR comment
1 parent 5c59b2a commit 8f7842c

File tree

2 files changed

+21
-79
lines changed

2 files changed

+21
-79
lines changed

server/swim_agent/src/lanes/supply/mod.rs

Lines changed: 19 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,13 @@ mod tests;
3939
struct SupplyLaneInner<T> {
4040
sync_queue: VecDeque<Uuid>,
4141
event_queue: VecDeque<T>,
42-
next: EventOrSync,
4342
}
4443

4544
impl<T> Default for SupplyLaneInner<T> {
4645
fn default() -> SupplyLaneInner<T> {
4746
SupplyLaneInner {
4847
sync_queue: Default::default(),
4948
event_queue: Default::default(),
50-
next: Default::default(),
5149
}
5250
}
5351
}
@@ -90,59 +88,8 @@ impl<T> AgentItem for SupplyLane<T> {
9088
}
9189
}
9290

93-
#[derive(Debug, Default)]
94-
enum EventOrSync {
95-
#[default]
96-
Event,
97-
Sync,
98-
}
99-
100-
impl EventOrSync {
101-
fn negate(&mut self) {
102-
match self {
103-
EventOrSync::Event => *self = EventOrSync::Sync,
104-
EventOrSync::Sync => *self = EventOrSync::Event,
105-
}
106-
}
107-
}
108-
10991
const INFALLIBLE_SER: &str = "Serializing a value to recon should be infallible.";
11092

111-
fn write<F, I, T, O>(
112-
buffer: &mut BytesMut,
113-
poll_queue: &mut VecDeque<I>,
114-
on_empty_queue: &mut VecDeque<O>,
115-
state: &mut EventOrSync,
116-
wrap: F,
117-
) -> Option<WriteResult>
118-
where
119-
F: Fn(I) -> LaneResponse<T>,
120-
T: StructuralWritable,
121-
{
122-
match poll_queue.pop_front() {
123-
Some(elem) => {
124-
let mut encoder = ValueLaneResponseEncoder::default();
125-
encoder.encode(wrap(elem), buffer).expect(INFALLIBLE_SER);
126-
if !on_empty_queue.is_empty() {
127-
state.negate();
128-
Some(WriteResult::DataStillAvailable)
129-
} else if poll_queue.is_empty() {
130-
Some(WriteResult::Done)
131-
} else {
132-
Some(WriteResult::DataStillAvailable)
133-
}
134-
}
135-
None => {
136-
if on_empty_queue.is_empty() {
137-
Some(WriteResult::Done)
138-
} else {
139-
state.negate();
140-
None
141-
}
142-
}
143-
}
144-
}
145-
14693
impl<T> LaneItem for SupplyLane<T>
14794
where
14895
T: StructuralWritable,
@@ -153,35 +100,30 @@ where
153100
let SupplyLaneInner {
154101
sync_queue,
155102
event_queue,
156-
next,
157103
} = &mut *guard;
158104

159-
loop {
160-
match next {
161-
EventOrSync::Event => {
162-
if let Some(result) = write(
163-
buffer,
164-
event_queue,
165-
sync_queue,
166-
next,
167-
LaneResponse::<T>::event,
168-
) {
169-
return result;
170-
}
171-
}
172-
EventOrSync::Sync => {
173-
if let Some(result) = write(
174-
buffer,
175-
sync_queue,
176-
event_queue,
177-
next,
178-
LaneResponse::<T>::synced,
179-
) {
180-
return result;
181-
}
105+
match sync_queue.pop_front() {
106+
Some(remote) => {
107+
let mut encoder = ValueLaneResponseEncoder::default();
108+
encoder
109+
.encode(LaneResponse::<T>::synced(remote), buffer)
110+
.expect(INFALLIBLE_SER);
111+
}
112+
None => {
113+
if let Some(event) = event_queue.pop_front() {
114+
let mut encoder = ValueLaneResponseEncoder::default();
115+
encoder
116+
.encode(LaneResponse::event(event), buffer)
117+
.expect(INFALLIBLE_SER);
182118
}
183119
}
184120
}
121+
122+
if !event_queue.is_empty() || !sync_queue.is_empty() {
123+
WriteResult::DataStillAvailable
124+
} else {
125+
WriteResult::Done
126+
}
185127
}
186128
}
187129

server/swim_agent/src/lanes/supply/tests.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,10 +223,10 @@ fn write_multiple() {
223223
assert_eq!(
224224
messages,
225225
vec![
226-
LaneResponse::StandardEvent(13),
227226
LaneResponse::Synced(SYNC_ID),
228-
LaneResponse::StandardEvent(14),
229227
LaneResponse::Synced(SYNC_ID2),
228+
LaneResponse::StandardEvent(13),
229+
LaneResponse::StandardEvent(14),
230230
LaneResponse::StandardEvent(15),
231231
]
232232
);

0 commit comments

Comments
 (0)