Skip to content

Commit 084a426

Browse files
committed
Added the ability to ignore NoOp messages
1 parent 1ddc872 commit 084a426

File tree

7 files changed

+49
-4
lines changed

7 files changed

+49
-4
lines changed

Core/Cleipnir.ResilientFunctions.Tests/Messaging/InMemoryTests/EventSourcesTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,8 @@ public override Task MessagesRemembersPreviousThrownEventProcessingExceptionOnAl
5656
[TestMethod]
5757
public override Task BatchedMessagesIsDeliveredToAwaitingFlows()
5858
=> BatchedMessagesIsDeliveredToAwaitingFlows(FunctionStoreFactory.Create());
59+
60+
[TestMethod]
61+
public override Task NoOpMessageIsIgnored()
62+
=> NoOpMessageIsIgnored(FunctionStoreFactory.Create());
5963
}

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesTests.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using Cleipnir.ResilientFunctions.CoreRuntime;
66
using Cleipnir.ResilientFunctions.CoreRuntime.Serialization;
77
using Cleipnir.ResilientFunctions.Domain;
8+
using Cleipnir.ResilientFunctions.Domain.Events;
89
using Cleipnir.ResilientFunctions.Helpers;
910
using Cleipnir.ResilientFunctions.Messaging;
1011
using Cleipnir.ResilientFunctions.Reactive.Extensions;
@@ -468,6 +469,26 @@ await registration.SendMessages(
468469
await controlPanel1.BusyWaitUntil(c => c.Status == Status.Succeeded);
469470
await controlPanel2.BusyWaitUntil(c => c.Status == Status.Succeeded);
470471
}
472+
473+
public abstract Task NoOpMessageIsIgnored();
474+
protected async Task NoOpMessageIsIgnored(Task<IFunctionStore> functionStoreTask)
475+
{
476+
var flowType = TestFlowId.Create().Type;
477+
var functionStore = await functionStoreTask;
478+
using var registry = new FunctionsRegistry(functionStore);
479+
var registration = registry.RegisterFunc<string, string>(
480+
flowType,
481+
async Task<string> (_, workflow) => (await workflow.Messages.First(maxWait: TimeSpan.FromSeconds(10))).ToString()!
482+
);
483+
484+
var invocation = registration.Invoke("SomeInstance", "SomeParam");
485+
486+
await registration.SendMessage("SomeInstance", NoOp.Instance);
487+
await registration.SendMessage("SomeInstance", "Hallo World!");
488+
489+
var result = await invocation;
490+
result.ShouldBe("Hallo World!");
491+
}
471492

472493
private Effect CreateEffect(StoredId storedId, FlowId flowId, IFunctionStore functionStore)
473494
{
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
namespace Cleipnir.ResilientFunctions.Domain.Events;
2+
3+
public record NoOp
4+
{
5+
public static NoOp Instance { get; } = new();
6+
}

Core/Cleipnir.ResilientFunctions/Messaging/MessagesPullerAndEmitter.cs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
using Cleipnir.ResilientFunctions.CoreRuntime;
77
using Cleipnir.ResilientFunctions.CoreRuntime.Invocation;
88
using Cleipnir.ResilientFunctions.CoreRuntime.Serialization;
9-
using Cleipnir.ResilientFunctions.Domain;
9+
using Cleipnir.ResilientFunctions.Domain.Events;
1010
using Cleipnir.ResilientFunctions.Reactive.Origin;
1111
using Cleipnir.ResilientFunctions.Storage;
1212

@@ -105,9 +105,11 @@ public async Task PullEvents(TimeSpan maxSinceLastSynced)
105105

106106
storedMessages = filterStoredMessages;
107107

108-
var events = storedMessages.Select(
109-
storedEvent => _serializer.DeserializeMessage(storedEvent.MessageContent, storedEvent.MessageType)
110-
);
108+
var events = storedMessages
109+
.Select(
110+
storedEvent => _serializer.DeserializeMessage(storedEvent.MessageContent, storedEvent.MessageType)
111+
)
112+
.Where(@event => @event is not NoOp);
111113

112114
Source.SignalNext(events);
113115
}

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/Messaging/MessagesTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,8 @@ public override Task MessagesRemembersPreviousThrownEventProcessingExceptionOnAl
4848
[TestMethod]
4949
public override Task BatchedMessagesIsDeliveredToAwaitingFlows()
5050
=> BatchedMessagesIsDeliveredToAwaitingFlows(FunctionStoreFactory.Create());
51+
52+
[TestMethod]
53+
public override Task NoOpMessageIsIgnored()
54+
=> NoOpMessageIsIgnored(FunctionStoreFactory.Create());
5155
}

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/Messaging/MessagesTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,8 @@ public override Task MessagesRemembersPreviousThrownEventProcessingExceptionOnAl
4949
[TestMethod]
5050
public override Task BatchedMessagesIsDeliveredToAwaitingFlows()
5151
=> BatchedMessagesIsDeliveredToAwaitingFlows(FunctionStoreFactory.Create());
52+
53+
[TestMethod]
54+
public override Task NoOpMessageIsIgnored()
55+
=> NoOpMessageIsIgnored(FunctionStoreFactory.Create());
5256
}

Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/Messaging/MessagesTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,8 @@ public override Task MessagesRemembersPreviousThrownEventProcessingExceptionOnAl
4949
[TestMethod]
5050
public override Task BatchedMessagesIsDeliveredToAwaitingFlows()
5151
=> BatchedMessagesIsDeliveredToAwaitingFlows(FunctionStoreFactory.Create());
52+
53+
[TestMethod]
54+
public override Task NoOpMessageIsIgnored()
55+
=> NoOpMessageIsIgnored(FunctionStoreFactory.Create());
5256
}

0 commit comments

Comments
 (0)