Skip to content

Commit 9012627

Browse files
committed
fix(Correlator): Added handling of correlations outcome, representing the final touch to the correlator app
Signed-off-by: Charles d'Avernas <charles.davernas@neuroglia.io>
1 parent 3418d56 commit 9012627

File tree

10 files changed

+160
-13
lines changed

10 files changed

+160
-13
lines changed

src/api/Synapse.Api.Server/Usings.cs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,17 @@
1-
global using Microsoft.AspNetCore.Diagnostics;
1+
// Copyright © 2024-Present Neuroglia SRL. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"),
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
global using Microsoft.AspNetCore.Diagnostics;
215
global using Microsoft.Extensions.Options;
316
global using Neuroglia;
417
global using Neuroglia.Serialization;
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
// Copyright © 2024-Present Neuroglia SRL. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"),
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
namespace Synapse;
15+
16+
/// <summary>
17+
/// Exposes default correlation outcome types
18+
/// </summary>
19+
public static class CorrelationOutcomeType
20+
{
21+
22+
/// <summary>
23+
/// Indicates that the correlation correlates to an existing workflow instance
24+
/// </summary>
25+
public const string Correlate = "correlate";
26+
/// <summary>
27+
/// Indicates that the correlation starts a new instance of a workflow
28+
/// </summary>
29+
public const string Start = "start";
30+
31+
/// <summary>
32+
/// Gets an <see cref="IEnumerable{T}"/> containing default correlation outcome types
33+
/// </summary>
34+
/// <returns>A new <see cref="IEnumerable{T}"/> containing default correlation outcome types</returns>
35+
public static IEnumerable<string> AsEnumerable()
36+
{
37+
yield return Start;
38+
yield return Correlate;
39+
}
40+
41+
}

src/core/Synapse.Core/Resources/CorrelateWorkflowOutcomeDefinition.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,13 @@ public record CorrelateWorkflowOutcomeDefinition
2424
/// Gets/sets a '{name}.{namespace}' reference to the workflow instance to correlate
2525
/// </summary>
2626
[Required]
27-
[DataMember(Name = "ref", Order = 1), JsonPropertyName("ref"), JsonPropertyOrder(1), YamlMember(Alias = "ref", Order = 1)]
28-
public required virtual string Ref { get; set; }
27+
[DataMember(Name = "instance", Order = 1), JsonPropertyName("instance"), JsonPropertyOrder(1), YamlMember(Alias = "instance", Order = 1)]
28+
public required virtual string Instance { get; set; }
29+
30+
/// <summary>
31+
/// Gets/sets the name of the task that consumes the correlated events
32+
/// </summary>
33+
[DataMember(Name = "task", Order = 2), JsonPropertyName("task"), JsonPropertyOrder(2), YamlMember(Alias = "task", Order = 2)]
34+
public required virtual string Task { get; set; }
2935

3036
}

src/core/Synapse.Core/Resources/CorrelationOutcomeDefinition.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ namespace Synapse.Resources;
2020
public record CorrelationOutcomeDefinition
2121
{
2222

23+
/// <summary>
24+
/// Gets the type of the correlation outcome
25+
/// </summary>
26+
public virtual string Type => this.Start != null ? CorrelationOutcomeType.Start : this.Correlate != null ? CorrelationOutcomeType.Correlate : throw new NotSupportedException($"The specified correlation outcome type is not supported");
27+
2328
/// <summary>
2429
/// Gets/sets an object used to configure the outcome, if any, used to start a new workflow instance,. Is mutually exclusive to <see cref="Correlate"/>
2530
/// </summary>

src/core/Synapse.Core/Resources/WorkflowInstanceStatus.cs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,24 +50,30 @@ public record WorkflowInstanceStatus
5050
[DataMember(Order = 5, Name = "runs"), JsonPropertyName("runs"), JsonPropertyOrder(5), YamlMember(Alias = "runs", Order = 5)]
5151
public virtual EquatableList<WorkflowRun>? Runs { get; set; }
5252

53+
/// <summary>
54+
/// Gets/sets a name/context map that contains the workflow's pending correlations
55+
/// </summary>
56+
[DataMember(Order = 6, Name = "correlations"), JsonPropertyName("correlations"), JsonPropertyOrder(6), YamlMember(Alias = "correlations", Order = 6)]
57+
public virtual EquatableDictionary<string, CorrelationContext>? Correlations { get; set; }
58+
5359
/// <summary>
5460
/// Gets/sets the error, if any, that has occurred during the workflow's execution
5561
/// </summary>
56-
[DataMember(Name = "error", Order = 6), JsonPropertyName("error"), JsonPropertyOrder(6), YamlMember(Alias = "error", Order = 6)]
62+
[DataMember(Name = "error", Order = 7), JsonPropertyName("error"), JsonPropertyOrder(7), YamlMember(Alias = "error", Order = 7)]
5763
public virtual Error? Error { get; set; }
5864

5965
/// <summary>
6066
/// Gets/sets a reference to the workflow's context data, if any
6167
/// </summary>
6268
[Required, MinLength(1)]
63-
[DataMember(Order = 7, Name = "contextReference"), JsonPropertyName("contextReference"), JsonPropertyOrder(7), YamlMember(Alias = "contextReference", Order = 7)]
69+
[DataMember(Order = 8, Name = "contextReference"), JsonPropertyName("contextReference"), JsonPropertyOrder(8), YamlMember(Alias = "contextReference", Order = 8)]
6470
public virtual string ContextReference { get; set; } = null!;
6571

6672
/// <summary>
6773
/// Gets/sets a reference to the workflow's context data, if any
6874
/// </summary>
6975
[Required, MinLength(1)]
70-
[DataMember(Order = 8, Name = "outputReference"), JsonPropertyName("outputReference"), JsonPropertyOrder(8), YamlMember(Alias = "outputReference", Order = 8)]
76+
[DataMember(Order = 9, Name = "outputReference"), JsonPropertyName("outputReference"), JsonPropertyOrder(9), YamlMember(Alias = "outputReference", Order = 9)]
7177
public virtual string? OutputReference { get; set; }
7278

7379
}

src/correlator/Synapse.Correlator/Commands/CloudEvents/IngestCloudEventCommand.cs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,17 @@
1-
namespace Synapse.Correlator.Commands.CloudEvents;
1+
// Copyright © 2024-Present Neuroglia SRL. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"),
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
namespace Synapse.Correlator.Commands.CloudEvents;
215

316
/// <summary>
417
/// Represents the command used to ingest a cloud event

src/correlator/Synapse.Correlator/Controllers/CloudEventsController.cs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,17 @@
1-
namespace Synapse.Correlator.Controllers;
1+
// Copyright © 2024-Present Neuroglia SRL. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"),
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
namespace Synapse.Correlator.Controllers;
215

316
/// <summary>
417
/// Represents the service used to manage <see cref="CloudEvent"/>s

src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
// See the License for the specific language governing permissions and
1212
// limitations under the License.
1313

14+
using Json.Patch;
15+
using Neuroglia.Data.Infrastructure.ResourceOriented;
16+
1417
namespace Synapse.Correlator.Services;
1518

1619
/// <summary>
@@ -299,6 +302,7 @@ protected virtual async Task<bool> TryFilterEventAsync(EventFilterDefinition fil
299302
protected virtual async Task CreateOrUpdateContextAsync(CorrelationContext context, CancellationToken cancellationToken =default)
300303
{
301304
ArgumentNullException.ThrowIfNull(context);
305+
JsonPatch patch;
302306
var originalResource = this.Correlation.Resource.Clone()!;
303307
var updatedResource = this.Correlation.Resource.Clone()!;
304308
updatedResource.Status ??= new();
@@ -317,10 +321,43 @@ protected virtual async Task CreateOrUpdateContextAsync(CorrelationContext conte
317321
{
318322
context.Status = CorrelationContextStatus.Completed;
319323
if (updatedResource.Spec.Lifetime == CorrelationLifetime.Ephemeral) updatedResource.Status.Phase = CorrelationStatusPhase.Completed;
320-
//todo: process the correlation's outcome
324+
WorkflowInstance? workflowInstance;
325+
switch (this.Correlation.Resource.Spec.Outcome.Type)
326+
{
327+
case CorrelationOutcomeType.Correlate:
328+
var qualifiedName = this.Correlation.Resource.Spec.Outcome.Correlate!.Instance.Trim().Split('.', StringSplitOptions.RemoveEmptyEntries);
329+
var name = qualifiedName[0];
330+
var @namespace = qualifiedName[1];
331+
workflowInstance = await this.Resources.GetAsync<WorkflowInstance>(name, @namespace, cancellationToken).ConfigureAwait(false) ?? throw new ProblemDetailsException(ResourceProblemDetails.ResourceNotFound(new ResourceReference<WorkflowInstance>(name, @namespace)));
332+
var updatedWorkflowInstance = workflowInstance.Clone()!;
333+
updatedWorkflowInstance.Status ??= new();
334+
updatedWorkflowInstance.Status.Correlations ??= [];
335+
updatedWorkflowInstance.Status.Correlations[this.Correlation.Resource.Spec.Outcome.Correlate!.Task] = context;
336+
patch = JsonPatchUtility.CreateJsonPatchFromDiff(workflowInstance, updatedWorkflowInstance);
337+
await this.Resources.PatchStatusAsync<WorkflowInstance>(new(PatchType.JsonPatch, patch), workflowInstance.GetName(), workflowInstance.GetNamespace(), false, cancellationToken).ConfigureAwait(false);
338+
break;
339+
case CorrelationOutcomeType.Start:
340+
var input = this.Correlation.Resource.Spec.Outcome.Start!.Input == null ? [] : await this.ExpressionEvaluator.EvaluateAsync<EquatableDictionary<string, object>>(this.Correlation.Resource.Spec.Outcome.Start!.Input!, context, cancellationToken: cancellationToken).ConfigureAwait(false);
341+
workflowInstance = new()
342+
{
343+
Metadata = new()
344+
{
345+
Namespace = this.Correlation.Resource.Spec.Outcome.Start!.Workflow.Namespace,
346+
Name = $"{this.Correlation.Resource.Spec.Outcome.Start!.Workflow.Namespace}-"
347+
},
348+
Spec = new()
349+
{
350+
Definition = this.Correlation.Resource.Spec.Outcome.Start!.Workflow,
351+
Input = input
352+
}
353+
};
354+
await this.Resources.AddAsync(workflowInstance, false, cancellationToken).ConfigureAwait(false);
355+
break;
356+
default: throw new NotSupportedException($"The specified correlation outcome type is not supported '{this.Correlation.Resource.Spec.Outcome.Type}'");
357+
}
321358
}
322359
}
323-
var patch = JsonPatchUtility.CreateJsonPatchFromDiff(originalResource, updatedResource);
360+
patch = JsonPatchUtility.CreateJsonPatchFromDiff(originalResource, updatedResource);
324361
await this.Resources.PatchStatusAsync<Correlation>(new(PatchType.JsonPatch, patch), originalResource.GetName(), originalResource.GetNamespace(), false, cancellationToken).ConfigureAwait(false);
325362
}
326363

src/dashboard/Synapse.Dashboard/Pages/Correlations/List/View.razor

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,13 +99,13 @@
9999
string GetOutcomeTarget(Correlation correlation)
100100
{
101101
if (correlation.Spec.Outcome.Start != null) return correlation.Spec.Outcome.Start.Workflow.ToString();
102-
else return correlation.Spec.Outcome.Correlate!.Ref;
102+
else return correlation.Spec.Outcome.Correlate!.Instance;
103103
}
104104

105105
string GetOutcomeTargetRef(Correlation correlation)
106106
{
107107
if (correlation.Spec.Outcome.Start != null) return $"/workflows/{correlation.Spec.Outcome.Start.Workflow.ToString()}";
108-
else return $"/workflow-instances/{correlation.Spec.Outcome.Correlate!.Ref}";
108+
else return $"/workflow-instances/{correlation.Spec.Outcome.Correlate!.Instance}";
109109
}
110110

111111
}

tests/Synapse.UnitTests/Cases/Correlator/CorrelatorHandlerTests.cs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,17 @@
1-
using Microsoft.Extensions.Configuration;
1+
// Copyright © 2024-Present Neuroglia SRL. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"),
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
using Microsoft.Extensions.Configuration;
215
using Microsoft.Extensions.Hosting;
316
using Microsoft.Extensions.Logging;
417
using Neuroglia;

0 commit comments

Comments
 (0)