Skip to content

Commit 91a74fd

Browse files
committed
Making ParamlessRegistration's SendMessage-method more robust
1 parent f9c3aa4 commit 91a74fd

File tree

6 files changed

+62
-4
lines changed

6 files changed

+62
-4
lines changed

Core/Cleipnir.ResilientFunctions.Tests/Messaging/InMemoryTests/EventSourcesTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ public override Task MessagesRemembersPreviousThrownEventProcessingExceptionOnAl
5757
public override Task BatchedMessagesIsDeliveredToAwaitingFlows()
5858
=> BatchedMessagesIsDeliveredToAwaitingFlows(FunctionStoreFactory.Create());
5959

60+
[TestMethod]
61+
public override Task MultipleMessagesCanBeAppendedOneAfterTheOther()
62+
=> MultipleMessagesCanBeAppendedOneAfterTheOther(FunctionStoreFactory.Create());
63+
6064
[TestMethod]
6165
public override Task NoOpMessageIsIgnored()
6266
=> NoOpMessageIsIgnored(FunctionStoreFactory.Create());

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesTests.cs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,43 @@ await registration.SendMessages(
469469
await controlPanel1.BusyWaitUntil(c => c.Status == Status.Succeeded);
470470
await controlPanel2.BusyWaitUntil(c => c.Status == Status.Succeeded);
471471
}
472+
public abstract Task MultipleMessagesCanBeAppendedOneAfterTheOther();
473+
protected async Task MultipleMessagesCanBeAppendedOneAfterTheOther(Task<IFunctionStore> functionStoreTask)
474+
{
475+
var flowType = TestFlowId.Create().Type;
476+
var functionStore = await functionStoreTask;
477+
using var registry = new FunctionsRegistry(functionStore, new Settings(messagesDefaultMaxWaitForCompletion: TimeSpan.FromSeconds(10)));
478+
var messages = new List<string>();
479+
var registration = registry.RegisterParamless(
480+
flowType,
481+
async Task (workflow) =>
482+
{
483+
await foreach (var message in workflow.Messages)
484+
{
485+
if (message is string s)
486+
await workflow.Effect.Capture(() => messages.Add(s));
487+
else
488+
return;
489+
}
490+
});
491+
492+
var instanceId = "Instance#1";
493+
494+
await registration.SendMessage(instanceId, "Hallo");
495+
await registration.SendMessage(instanceId, "World");
496+
await registration.SendMessage(instanceId, "And");
497+
await registration.SendMessage(instanceId, "Universe");
498+
await registration.SendMessage(instanceId, -1);
499+
500+
var cp = await registration.ControlPanel(instanceId).ShouldNotBeNullAsync();
501+
await cp.WaitForCompletion(allowPostponeAndSuspended: true);
502+
503+
messages.Count.ShouldBe(4);
504+
messages[0].ShouldBe("Hallo");
505+
messages[1].ShouldBe("World");
506+
messages[2].ShouldBe("And");
507+
messages[3].ShouldBe("Universe");
508+
}
472509

473510
public abstract Task NoOpMessageIsIgnored();
474511
protected async Task NoOpMessageIsIgnored(Task<IFunctionStore> functionStoreTask)

Core/Cleipnir.ResilientFunctions/ParamlessRegistration.cs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public class ParamlessRegistration : BaseRegistration
3232

3333
private readonly StateFetcher _stateFetcher;
3434
public MessageWriters MessageWriters { get; }
35+
private readonly IFunctionStore _functionStore;
3536

3637
public ParamlessRegistration(
3738
FlowType flowType,
@@ -57,6 +58,7 @@ UtcNow utcNow
5758
_controlPanelFactory = controlPanelFactory;
5859
MessageWriters = messageWriters;
5960
_stateFetcher = stateFetcher;
61+
_functionStore = functionStore;
6062
}
6163

6264
public Task<ControlPanel?> ControlPanel(FlowInstance flowInstance)
@@ -79,11 +81,14 @@ public async Task<Finding> SendMessage<T>(
7981
bool create = true,
8082
string? idempotencyKey = null) where T : notnull
8183
{
82-
var finding = await Postman.SendMessage(flowInstance.Value.ToStoredInstance(), message, idempotencyKey);
83-
if (create && finding == Finding.NotFound)
84-
await ScheduleAt(flowInstance, delayUntil: UtcNow());
84+
if (create)
85+
{
86+
var sf = await _functionStore.GetFunction(new StoredId(StoredType, flowInstance.ToStoredInstance()));
87+
if (sf is null)
88+
await Schedule(flowInstance);
89+
}
8590

86-
return finding;
91+
return await Postman.SendMessage(flowInstance.Value.ToStoredInstance(), message, idempotencyKey);
8792
}
8893

8994
public async Task SendMessages(IReadOnlyList<BatchedMessage> messages, bool interrupt = true)

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/Messaging/MessagesTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ public override Task MessagesRemembersPreviousThrownEventProcessingExceptionOnAl
4949
public override Task BatchedMessagesIsDeliveredToAwaitingFlows()
5050
=> BatchedMessagesIsDeliveredToAwaitingFlows(FunctionStoreFactory.Create());
5151

52+
[TestMethod]
53+
public override Task MultipleMessagesCanBeAppendedOneAfterTheOther()
54+
=> MultipleMessagesCanBeAppendedOneAfterTheOther(FunctionStoreFactory.Create());
55+
5256
[TestMethod]
5357
public override Task NoOpMessageIsIgnored()
5458
=> NoOpMessageIsIgnored(FunctionStoreFactory.Create());

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/Messaging/MessagesTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ public override Task MessagesRemembersPreviousThrownEventProcessingExceptionOnAl
5050
public override Task BatchedMessagesIsDeliveredToAwaitingFlows()
5151
=> BatchedMessagesIsDeliveredToAwaitingFlows(FunctionStoreFactory.Create());
5252

53+
[TestMethod]
54+
public override Task MultipleMessagesCanBeAppendedOneAfterTheOther()
55+
=> MultipleMessagesCanBeAppendedOneAfterTheOther(FunctionStoreFactory.Create());
56+
5357
[TestMethod]
5458
public override Task NoOpMessageIsIgnored()
5559
=> NoOpMessageIsIgnored(FunctionStoreFactory.Create());

Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/Messaging/MessagesTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ public override Task MessagesRemembersPreviousThrownEventProcessingExceptionOnAl
5050
public override Task BatchedMessagesIsDeliveredToAwaitingFlows()
5151
=> BatchedMessagesIsDeliveredToAwaitingFlows(FunctionStoreFactory.Create());
5252

53+
[TestMethod]
54+
public override Task MultipleMessagesCanBeAppendedOneAfterTheOther()
55+
=> MultipleMessagesCanBeAppendedOneAfterTheOther(FunctionStoreFactory.Create());
56+
5357
[TestMethod]
5458
public override Task NoOpMessageIsIgnored()
5559
=> NoOpMessageIsIgnored(FunctionStoreFactory.Create());

0 commit comments

Comments
 (0)