Skip to content

Commit e46a135

Browse files
committed
Added Owner to ControlPanel
1 parent b4f3010 commit e46a135

File tree

11 files changed

+81
-19
lines changed

11 files changed

+81
-19
lines changed

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/SunshineTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,4 +91,8 @@ public override Task ParamlessCanBeCreatedWithInitialFailedEffect()
9191
[TestMethod]
9292
public override Task FunctionCanAcceptAndReturnOptionType()
9393
=> FunctionCanAcceptAndReturnOptionType(FunctionStoreFactory.Create());
94+
95+
[TestMethod]
96+
public override Task ExecutingFunctionHasOwner()
97+
=> ExecutingFunctionHasOwner(FunctionStoreFactory.Create());
9498
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/SunshineTests.cs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -716,4 +716,40 @@ public async Task FunctionCanAcceptAndReturnOptionType(Task<IFunctionStore> stor
716716

717717
unhandledExceptionHandler.ShouldNotHaveExceptions();
718718
}
719+
720+
public abstract Task ExecutingFunctionHasOwner();
721+
public async Task ExecutingFunctionHasOwner(Task<IFunctionStore> storeTask)
722+
{
723+
var store = await storeTask;
724+
var flowId = TestFlowId.Create();
725+
726+
var unhandledExceptionHandler = new UnhandledExceptionCatcher();
727+
using var functionsRegistry = new FunctionsRegistry(store, new Settings(unhandledExceptionHandler.Catch));
728+
729+
var insideFlag = new SyncedFlag();
730+
var completeFlag = new SyncedFlag();
731+
var registration = functionsRegistry
732+
.RegisterAction(
733+
flowId.Type,
734+
inner: async Task (string _) =>
735+
{
736+
insideFlag.Raise();
737+
await completeFlag.WaitForRaised();
738+
}
739+
);
740+
var flowTask = registration.Invoke(flowId.Instance, "param");
741+
await insideFlag.WaitForRaised();
742+
743+
var cp = await registration.ControlPanel(flowId.Instance).ShouldNotBeNullAsync();
744+
cp.Owner.ShouldNotBeNull();
745+
cp.Owner.ShouldBe(functionsRegistry.ClusterInfo.ReplicaId);
746+
747+
completeFlag.Raise();
748+
await flowTask;
749+
750+
await cp.Refresh();
751+
//todo cp.Owner.ShouldBeNull();
752+
753+
unhandledExceptionHandler.ShouldNotHaveExceptions();
754+
}
719755
}

Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/FunctionState.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ public record FunctionState<TParam, TReturn>(
66
Status Status,
77
int Epoch,
88
long Expires,
9+
ReplicaId? Owner,
910
TParam? Param,
1011
TReturn? Result,
1112
FatalWorkflowException? FatalWorkflowException

Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,7 @@ public async Task Interrupt(IReadOnlyList<StoredId> storedIds)
363363
sf.Status,
364364
sf.Epoch,
365365
sf.Expires,
366+
sf.OwnerId,
366367
Param:
367368
sf.Parameter == null
368369
? default

Core/Cleipnir.ResilientFunctions/Domain/ClusterInfo.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ public int Offset
1414
lock (_sync)
1515
return _offset;
1616
}
17-
set
17+
internal set
1818
{
1919
lock (_sync)
2020
_offset = value;
@@ -29,7 +29,7 @@ public int ReplicaCount
2929
lock (_sync)
3030
return _replicaCount;
3131
}
32-
set
32+
internal set
3333
{
3434
lock (_sync)
3535
_replicaCount = value;

Core/Cleipnir.ResilientFunctions/Domain/ControlPanel.cs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ internal ControlPanel(
1515
Invoker<Unit, Unit> invoker,
1616
InvocationHelper<Unit, Unit> invocationHelper,
1717
FlowId flowId, StoredId storedId,
18+
ReplicaId? owner,
1819
Status status, int epoch, long expires,
1920
ExistingEffects effects,
2021
ExistingStates states, ExistingMessages messages, ExistingSemaphores semaphores,
@@ -23,7 +24,7 @@ internal ControlPanel(
2324
UtcNow utcNow
2425
) : base(
2526
invoker, invocationHelper,
26-
flowId, storedId, status, epoch,
27+
flowId, storedId, owner, status, epoch,
2728
expires, innerParam: Unit.Instance, innerResult: Unit.Instance, effects,
2829
states, messages, registeredTimeouts, semaphores, correlations, fatalWorkflowException,
2930
utcNow
@@ -58,6 +59,7 @@ internal ControlPanel(
5859
Invoker<TParam, Unit> invoker,
5960
InvocationHelper<TParam, Unit> invocationHelper,
6061
FlowId flowId, StoredId storedId,
62+
ReplicaId? owner,
6163
Status status, int epoch, long expires, TParam innerParam,
6264
ExistingEffects effects,
6365
ExistingStates states, ExistingMessages messages, ExistingSemaphores semaphores,
@@ -66,7 +68,7 @@ internal ControlPanel(
6668
UtcNow utcNow
6769
) : base(
6870
invoker, invocationHelper,
69-
flowId, storedId, status, epoch,
71+
flowId, storedId, owner, status, epoch,
7072
expires, innerParam, innerResult: Unit.Instance, effects,
7173
states, messages, registeredTimeouts, semaphores, correlations, fatalWorkflowException,
7274
utcNow
@@ -106,15 +108,15 @@ public class ControlPanel<TParam, TReturn> : BaseControlPanel<TParam, TReturn> w
106108
internal ControlPanel(
107109
Invoker<TParam, TReturn> invoker,
108110
InvocationHelper<TParam, TReturn> invocationHelper,
109-
FlowId flowId, StoredId storedId, Status status, int epoch,
111+
FlowId flowId, StoredId storedId, ReplicaId? owner, Status status, int epoch,
110112
long expires, TParam innerParam,
111113
TReturn? innerResult,
112114
ExistingEffects effects, ExistingStates states, ExistingMessages messages, ExistingSemaphores semaphores,
113115
ExistingRegisteredTimeouts registeredTimeouts, Correlations correlations, FatalWorkflowException? fatalWorkflowException,
114116
UtcNow utcNow
115117
) : base(
116118
invoker, invocationHelper,
117-
flowId, storedId, status, epoch, expires,
119+
flowId, storedId, owner, status, epoch, expires,
118120
innerParam, innerResult, effects, states, messages,
119121
registeredTimeouts, semaphores, correlations, fatalWorkflowException,
120122
utcNow
@@ -161,6 +163,7 @@ internal BaseControlPanel(
161163
InvocationHelper<TParam, TReturn> invocationHelper,
162164
FlowId flowId,
163165
StoredId storedId,
166+
ReplicaId? owner,
164167
Status status,
165168
int epoch,
166169
long expires,
@@ -179,6 +182,7 @@ internal BaseControlPanel(
179182
_invocationHelper = invocationHelper;
180183
FlowId = flowId;
181184
StoredId = storedId;
185+
Owner = owner;
182186
Status = status;
183187
Epoch = epoch;
184188
LeaseExpiration = expires == long.MaxValue
@@ -200,6 +204,7 @@ internal BaseControlPanel(
200204

201205
public FlowId FlowId { get; }
202206
public StoredId StoredId { get; }
207+
public ReplicaId? Owner { get; }
203208
public Status Status { get; private set; }
204209
protected UtcNow UtcNow { get; }
205210

Core/Cleipnir.ResilientFunctions/Domain/ControlPanelFactory.cs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ internal ControlPanelFactory(FlowType flowType, StoredType storedType, Invoker<U
3737
_invocationHelper,
3838
flowId,
3939
storedId,
40+
functionState.Owner,
4041
functionState.Status,
4142
functionState.Epoch,
4243
functionState.Expires,
@@ -84,6 +85,7 @@ internal ControlPanelFactory(FlowType flowType, StoredType storedType, Invoker<T
8485
_invocationHelper,
8586
flowId,
8687
storedId,
88+
functionState.Owner,
8789
functionState.Status,
8890
functionState.Epoch,
8991
functionState.Expires,
@@ -121,8 +123,8 @@ internal ControlPanelFactory(FlowType flowType, StoredType storedType, Invoker<T
121123
{
122124
var flowId = new FlowId(_flowType, flowInstance);
123125
var storedId = new StoredId(_storedType, flowInstance.Value.ToStoredInstance());
124-
var f = await _invocationHelper.GetFunction(storedId, flowId);
125-
if (f == null)
126+
var functionState = await _invocationHelper.GetFunction(storedId, flowId);
127+
if (functionState == null)
126128
return null;
127129

128130
var existingEffects = _invocationHelper.CreateExistingEffects(flowId);
@@ -131,18 +133,19 @@ internal ControlPanelFactory(FlowType flowType, StoredType storedType, Invoker<T
131133
_invocationHelper,
132134
flowId,
133135
storedId,
134-
f.Status,
135-
f.Epoch,
136-
f.Expires,
137-
f.Param!,
138-
f.Result,
136+
functionState.Owner,
137+
functionState.Status,
138+
functionState.Epoch,
139+
functionState.Expires,
140+
functionState.Param!,
141+
functionState.Result,
139142
existingEffects,
140143
_invocationHelper.CreateExistingStates(flowId),
141144
_invocationHelper.CreateExistingMessages(flowId),
142145
_invocationHelper.CreateExistingSemaphores(flowId),
143146
_invocationHelper.CreateExistingTimeouts(flowId, existingEffects),
144147
_invocationHelper.CreateCorrelations(flowId),
145-
f.FatalWorkflowException,
148+
functionState.FatalWorkflowException,
146149
_utcNow
147150
);
148151
}

Core/Cleipnir.ResilientFunctions/FunctionsRegistry.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public class FunctionsRegistry : IDisposable
2828
private readonly LeasesUpdater _leasesUpdater;
2929
private readonly StoredTypes _storedTypes;
3030

31-
private readonly ClusterInfo _clusterInfo;
31+
public ClusterInfo ClusterInfo { get; }
3232

3333
private volatile bool _disposed;
3434
private readonly Lock _sync = new();
@@ -61,7 +61,7 @@ public FunctionsRegistry(IFunctionStore functionStore, Settings? settings = null
6161
utcNow
6262
);
6363

64-
_clusterInfo = new ClusterInfo(ReplicaId.NewId());
64+
ClusterInfo = new ClusterInfo(ReplicaId.NewId());
6565
}
6666

6767
#region Func overloads
@@ -225,7 +225,7 @@ public FuncRegistration<TParam, TReturn> RegisterFunc<TParam, TReturn>(
225225
invocationHelper,
226226
settingsWithDefaults.UnhandledExceptionHandler,
227227
_functionStore.Utilities,
228-
_clusterInfo.ReplicaId
228+
ClusterInfo.ReplicaId
229229
);
230230

231231
WatchDogsFactory.CreateAndStart(
@@ -322,7 +322,7 @@ private ParamlessRegistration RegisterParamless(
322322
invocationHelper,
323323
settingsWithDefaults.UnhandledExceptionHandler,
324324
_functionStore.Utilities,
325-
_clusterInfo.ReplicaId
325+
ClusterInfo.ReplicaId
326326
);
327327

328328
WatchDogsFactory.CreateAndStart(
@@ -419,7 +419,7 @@ public ActionRegistration<TParam> RegisterAction<TParam>(
419419
invocationHelper,
420420
settingsWithDefaults.UnhandledExceptionHandler,
421421
_functionStore.Utilities,
422-
_clusterInfo.ReplicaId
422+
ClusterInfo.ReplicaId
423423
);
424424

425425
WatchDogsFactory.CreateAndStart(

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/RFunctionTests/SunshineTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,4 +88,8 @@ public override Task ParamlessCanBeCreatedWithInitialFailedEffect()
8888
[TestMethod]
8989
public override Task FunctionCanAcceptAndReturnOptionType()
9090
=> FunctionCanAcceptAndReturnOptionType(FunctionStoreFactory.Create());
91+
92+
[TestMethod]
93+
public override Task ExecutingFunctionHasOwner()
94+
=> ExecutingFunctionHasOwner(FunctionStoreFactory.Create());
9195
}

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/RFunctionTests/SunshineTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,4 +89,8 @@ public override Task ParamlessCanBeCreatedWithInitialFailedEffect()
8989
[TestMethod]
9090
public override Task FunctionCanAcceptAndReturnOptionType()
9191
=> FunctionCanAcceptAndReturnOptionType(FunctionStoreFactory.Create());
92+
93+
[TestMethod]
94+
public override Task ExecutingFunctionHasOwner()
95+
=> ExecutingFunctionHasOwner(FunctionStoreFactory.Create());
9296
}

0 commit comments

Comments
 (0)