Skip to content

Commit 344ae49

Browse files
authored
Merge pull request #3545 from BalmungSan/fix-flow-stream-subscriber
Be more deffensive with the `StreamSubscriber` internal state
2 parents 4fabeb1 + 672d0dc commit 344ae49

File tree

1 file changed

+21
-22
lines changed

1 file changed

+21
-22
lines changed

core/shared/src/main/scala/fs2/interop/flow/StreamSubscriber.scala

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,9 @@ private[flow] final class StreamSubscriber[F[_], A] private (
6666
* since they are always done on the effect run after the state update took place.
6767
* Meaning this should be correct if the Producer is well-behaved.
6868
*/
69-
private var inOnNextLoop: Boolean = _
69+
private var inOnNextLoop: Boolean = false
7070
private var buffer: Array[Any] = null
71-
private var index: Int = _
71+
private var index: Int = 0
7272

7373
/** Receives the next record from the upstream reactive-streams system. */
7474
override final def onNext(a: A): Unit = {
@@ -164,10 +164,10 @@ private[flow] final class StreamSubscriber[F[_], A] private (
164164
state -> run {
165165
// We do the updates here,
166166
// to ensure they happen after we have secured the state.
167-
inOnNextLoop = true
168-
index = 1
169167
buffer = new Array(chunkSize)
170168
buffer(0) = a
169+
index = 1
170+
inOnNextLoop = true
171171
}
172172
}
173173

@@ -188,15 +188,18 @@ private[flow] final class StreamSubscriber[F[_], A] private (
188188
Idle(s) -> run {
189189
// We do the updates here,
190190
// to ensure they happen after we have secured the state.
191-
cb.apply(Right(Some(Chunk.array(buffer))))
191+
val chunk = Chunk.array(buffer)
192192
inOnNextLoop = false
193193
buffer = null
194+
cb.apply(Right(Some(chunk)))
194195
}
195196

196197
case state =>
197198
Failed(
198199
new InvalidStateException(operation = s"Received record [${buffer.last}]", state)
199200
) -> run {
201+
// We do the updates here,
202+
// to ensure they happen after we have secured the state.
200203
inOnNextLoop = false
201204
buffer = null
202205
}
@@ -206,19 +209,15 @@ private[flow] final class StreamSubscriber[F[_], A] private (
206209
case Uninitialized(Some(cb)) =>
207210
Terminal -> run {
208211
cb.apply(Left(ex))
209-
// We do the updates here,
210-
// to ensure they happen after we have secured the state.
211-
inOnNextLoop = false
212-
buffer = null
213212
}
214213

215214
case WaitingOnUpstream(cb, _) =>
216215
Terminal -> run {
217-
cb.apply(Left(ex))
218216
// We do the updates here,
219217
// to ensure they happen after we have secured the state.
220218
inOnNextLoop = false
221219
buffer = null
220+
cb.apply(Left(ex))
222221
}
223222

224223
case _ =>
@@ -240,17 +239,21 @@ private[flow] final class StreamSubscriber[F[_], A] private (
240239

241240
case WaitingOnUpstream(cb, s) =>
242241
Terminal -> run {
242+
// We do the updates here,
243+
// to ensure they happen after we have secured the state.
243244
if (canceled) {
244245
s.cancel()
246+
inOnNextLoop = false
247+
buffer = null
245248
cb.apply(Right(None))
246-
} else if (index == 0) {
249+
} else if (buffer eq null) {
250+
inOnNextLoop = false
247251
cb.apply(Right(None))
248252
} else {
249-
cb.apply(Right(Some(Chunk.array(buffer, offset = 0, length = index))))
250-
// We do the updates here,
251-
// to ensure they happen after we have secured the state.
253+
val chunk = Chunk.array(buffer, offset = 0, length = index)
252254
inOnNextLoop = false
253255
buffer = null
256+
cb.apply(Right(Some(chunk)))
254257
}
255258
}
256259

@@ -268,10 +271,6 @@ private[flow] final class StreamSubscriber[F[_], A] private (
268271
case Idle(s) =>
269272
WaitingOnUpstream(cb, s) -> run {
270273
s.request(chunkSize.toLong)
271-
// We do the updates here,
272-
// to ensure they happen after we have secured the state.
273-
inOnNextLoop = false
274-
index = 0
275274
}
276275

277276
case state @ Uninitialized(Some(otherCB)) =>
@@ -283,14 +282,14 @@ private[flow] final class StreamSubscriber[F[_], A] private (
283282

284283
case state @ WaitingOnUpstream(otherCB, s) =>
285284
Terminal -> run {
286-
s.cancel()
287-
val ex = Left(new InvalidStateException(operation = "Received request", state))
288-
otherCB.apply(ex)
289-
cb.apply(ex)
290285
// We do the updates here,
291286
// to ensure they happen after we have secured the state.
292287
inOnNextLoop = false
293288
buffer = null
289+
s.cancel()
290+
val ex = Left(new InvalidStateException(operation = "Received request", state))
291+
otherCB.apply(ex)
292+
cb.apply(ex)
294293
}
295294

296295
case Failed(ex) =>

0 commit comments

Comments
 (0)