Skip to content

Commit 8e2e11d

Browse files
authored
Streams: Fix SelectAsync race condition bug (#7338)
1 parent 6f937b5 commit 8e2e11d

File tree

1 file changed

+25
-3
lines changed
  • src/core/Akka.Streams/Implementation/Fusing

1 file changed

+25
-3
lines changed

src/core/Akka.Streams/Implementation/Fusing/Ops.cs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2529,13 +2529,15 @@ private class Holder<T>
25292529
{
25302530
private readonly Action<Holder<T>> _callback;
25312531

2532-
public Holder(Result<T> element, Action<Holder<T>> callback)
2532+
public Holder(object message, Result<T> element, Action<Holder<T>> callback)
25332533
{
25342534
_callback = callback;
2535+
Message = message;
25352536
Element = element;
25362537
}
25372538

25382539
public Result<T> Element { get; private set; }
2540+
public object Message { get; }
25392541

25402542
public void SetElement(Result<T> result)
25412543
{
@@ -2575,7 +2577,7 @@ public override void OnPush()
25752577
try
25762578
{
25772579
var task = _stage._mapFunc(message);
2578-
var holder = new Holder<TOut>(NotYetThere, _taskCallback);
2580+
var holder = new Holder<TOut>(message, NotYetThere, _taskCallback);
25792581
_buffer.Enqueue(holder);
25802582

25812583
// We dispatch the task if it's ready to optimize away
@@ -2642,9 +2644,29 @@ private void PushOne()
26422644
}
26432645
else
26442646
{
2645-
var result = _buffer.Dequeue().Element;
2647+
var holder = _buffer.Dequeue();
2648+
var result = holder.Element;
26462649
if (!result.IsSuccess)
2650+
{
2651+
// this could happen if we are looping in PushOne and end up on a failed Task before the
2652+
// HolderCompleted callback has run
2653+
var strategy = _decider(result.Exception);
2654+
Log.Error(result.Exception, "An exception occured inside SelectAsync while processing message [{0}]. Supervision strategy: {1}", holder.Message, strategy);
2655+
switch (strategy)
2656+
{
2657+
case Directive.Stop:
2658+
FailStage(result.Exception);
2659+
return;
2660+
2661+
case Directive.Resume:
2662+
case Directive.Restart:
2663+
break;
2664+
2665+
default:
2666+
throw new AggregateException($"Unknown SupervisionStrategy directive: {strategy}", result.Exception);
2667+
}
26472668
continue;
2669+
}
26482670

26492671
Push(_stage.Out, result.Value);
26502672
if (Todo < _stage._parallelism && !HasBeenPulled(inlet))

0 commit comments

Comments
 (0)