Skip to content

Commit c244b88

Browse files
authored
Merge pull request #333 from neuroglia-io/fix-produce-event-activity
- Fixed the ProduceEventProcessor to publish the event using the newly added PublishEvent context method - Fixed the ProduceEventProcessor to publish the event using the newly added PublishEvent context method - Fixed the OAuth2TokenManager to convert token request properties to snake case
2 parents 28d954a + 55674e4 commit c244b88

File tree

23 files changed

+353
-1124
lines changed

23 files changed

+353
-1124
lines changed

src/apis/runtime/Synapse.Apis.Runtime.Core/Services/ISynapseRuntimeApi.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
using Neuroglia.Serialization;
1818
using ServerlessWorkflow.Sdk.Models;
19+
using Synapse.Integration.Commands.Events;
1920
using Synapse.Integration.Commands.Generic;
2021
using Synapse.Integration.Commands.WorkflowActivities;
2122
using Synapse.Integration.Commands.WorkflowInstances;
@@ -76,6 +77,15 @@ public interface ISynapseRuntimeApi
7677
[OperationContract]
7778
Task SetCorrelationMappingAsync(V1SetWorkflowInstanceCorrelationMappingCommand command, CancellationToken cancellationToken = default);
7879

80+
/// <summary>
81+
/// Publishes the specified <see cref="V1Event"/>
82+
/// </summary>
83+
/// <param name="command">The object that describes the command to execute</param>
84+
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
85+
/// <returns>A new awaitable <see cref="Task"/></returns>
86+
[OperationContract]
87+
Task PublishEventAsync(V1PublishEventCommand command, CancellationToken cancellationToken = default);
88+
7989
/// <summary>
8090
/// Gets all the activities (including non-operative ones) of the specified workflow instance
8191
/// </summary>

src/apis/runtime/Synapse.Apis.Runtime.Grpc.Client/Services/ISynapseGrpcRuntimeApi.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
using ProtoBuf.Grpc;
2020
using ServerlessWorkflow.Sdk.Models;
2121
using Synapse.Apis.Runtime.Grpc.Models;
22+
using Synapse.Integration.Commands.Events;
2223
using Synapse.Integration.Commands.WorkflowActivities;
2324
using Synapse.Integration.Commands.WorkflowInstances;
2425
using Synapse.Integration.Models;
@@ -80,6 +81,15 @@ public interface ISynapseGrpcRuntimeApi
8081
[OperationContract]
8182
Task<GrpcApiResult<V1WorkflowInstance>> SetCorrelationMappingAsync(V1SetWorkflowInstanceCorrelationMappingCommand command, CallContext context = default);
8283

84+
/// <summary>
85+
/// Publishes the specified <see cref="V1Event"/>
86+
/// </summary>
87+
/// <param name="command">The object that describes the command to execute</param>
88+
/// <param name="context">The current <see cref="CallContext"/></param>
89+
/// <returns>A new awaitable <see cref="Task"/></returns>
90+
[OperationContract]
91+
Task<GrpcApiResult> PublishEventAsync(V1PublishEventCommand command, CallContext context = default);
92+
8393
/// <summary>
8494
/// Gets the activities (including non-operative ones) of the specified workflow instance
8595
/// </summary>

src/apis/runtime/Synapse.Apis.Runtime.Grpc.Client/Services/SynapseGrpcRuntimeApiClient.cs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
using Microsoft.Extensions.Logging;
1919
using Neuroglia;
2020
using Neuroglia.Serialization;
21+
using Synapse.Integration.Commands.Events;
2122
using Synapse.Integration.Commands.Generic;
2223
using Synapse.Integration.Commands.WorkflowActivities;
2324
using Synapse.Integration.Commands.WorkflowInstances;
@@ -87,11 +88,17 @@ public virtual async Task<V1WorkflowInstance> StartAsync(string workflowInstance
8788
public virtual async Task<V1Event?> ConsumeOrBeginCorrelateEventAsync(V1ConsumeWorkflowInstancePendingEventCommand command, CancellationToken cancellationToken = default)
8889
{
8990
var result = await this.RuntimeApi.ConsumeOrBeginCorrelateEventAsync(command, cancellationToken);
90-
if (!result.Succeeded)
91-
throw new OperationResultException(new OperationResult(result.Code, result.Errors?.Select(e => new Neuroglia.Error(e.Code, e.Message))?.ToArray()));
91+
if (!result.Succeeded) throw new OperationResultException(new OperationResult(result.Code, result.Errors?.Select(e => new Neuroglia.Error(e.Code, e.Message))?.ToArray()));
9292
return result.Data!;
9393
}
9494

95+
/// <inheritdoc/>
96+
public virtual async Task PublishEventAsync(V1PublishEventCommand command, CancellationToken cancellationToken = default)
97+
{
98+
var result = await this.RuntimeApi.PublishEventAsync(command, cancellationToken);
99+
if (!result.Succeeded) throw new OperationResultException(new OperationResult(result.Code, result.Errors?.Select(e => new Neuroglia.Error(e.Code, e.Message))?.ToArray()));
100+
}
101+
95102
/// <inheritdoc/>
96103
public virtual async Task<List<V1WorkflowActivity>> GetActivitiesAsync(string workflowInstanceId, CancellationToken cancellationToken = default)
97104
{

src/apis/runtime/Synapse.Apis.Runtime.Grpc/Services/SynapseGrpcRuntimeApi.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
using ProtoBuf.Grpc;
2323
using Synapse.Apis.Runtime.Grpc.Models;
2424
using Synapse.Infrastructure.Services;
25+
using Synapse.Integration.Commands.Events;
2526
using Synapse.Integration.Commands.WorkflowActivities;
2627
using Synapse.Integration.Commands.WorkflowInstances;
2728
using Synapse.Integration.Models;
@@ -103,6 +104,12 @@ public virtual async Task<GrpcApiResult<V1WorkflowInstance>> StartAsync(string w
103104
return GrpcApiResult.CreateFor(await this.Mediator.ExecuteAsync(Mapper.Map<Application.Commands.WorkflowInstances.V1ConsumeOrBeginCorrelateEventCommand>(command), context.CancellationToken));
104105
}
105106

107+
/// <inheritdoc/>
108+
public virtual async Task<GrpcApiResult> PublishEventAsync(V1PublishEventCommand command, CallContext context = default)
109+
{
110+
return GrpcApiResult.CreateFor(await this.Mediator.ExecuteAsync(Mapper.Map<Application.Commands.Events.V1PublishEventCommand>(command), context.CancellationToken));
111+
}
112+
106113
/// <inheritdoc/>
107114
public virtual async Task<GrpcApiResult<bool>> TryCorrelateAsync(V1TryCorrelateWorkflowInstanceCommand command, CallContext context = default)
108115
{

src/apis/runtime/Synapse.Apis.Runtime.Ipc.Client/Services/SynapseIpcRuntimeApiClient.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using Neuroglia.Mediation;
55
using Neuroglia.Serialization;
66
using Synapse.Infrastructure.Services;
7+
using Synapse.Integration.Commands.Events;
78
using Synapse.Integration.Commands.WorkflowActivities;
89
using Synapse.Integration.Commands.WorkflowInstances;
910
using Synapse.Integration.Models;
@@ -93,6 +94,13 @@ public virtual async Task<V1WorkflowInstance> StartAsync(string workflowInstance
9394
return await this.Mediator.ExecuteAndUnwrapAsync(this.Mapper.Map<Application.Commands.WorkflowInstances.V1ConsumeOrBeginCorrelateEventCommand>(command), cancellationToken);
9495
}
9596

97+
/// <inheritdoc/>
98+
public virtual async Task PublishEventAsync(V1PublishEventCommand command, CancellationToken cancellationToken = default)
99+
{
100+
if (command == null) throw new ArgumentNullException(nameof(command));
101+
await this.Mediator.ExecuteAndUnwrapAsync(this.Mapper.Map<Application.Commands.Events.V1PublishEventCommand>(command), cancellationToken);
102+
}
103+
96104
/// <inheritdoc/>
97105
public virtual async Task<List<V1WorkflowActivity>> GetActivitiesAsync(string workflowInstanceId, CancellationToken cancellationToken = default)
98106
{

src/apps/Synapse.Server/appsettings.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
},
1515
"CloudEvents": {
1616
"Sink": {
17-
"Uri": "https://webhook.site/0d0f64a9-fc21-49bb-a2a9-e230929dcc0f"
17+
"Uri": "https://webhook.site/2e78793f-69a5-4ef6-8fdf-d467a9d6f1c6"
1818
}
1919
}
2020
}

src/apps/Synapse.Worker/Services/IWorkflowRuntimeContext.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,14 @@ public interface IWorkflowRuntimeContext
5959
/// <returns>The secret with the specified name</returns>
6060
Task<T> GetSecretAsync<T>(string secret, CancellationToken cancellationToken = default);
6161

62+
/// <summary>
63+
/// Publishes the specified <see cref="V1Event"/>
64+
/// </summary>
65+
/// <param name="e">The <see cref="V1Event"/> to publish</param>
66+
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
67+
/// <returns>A new awaitable <see cref="Task"/></returns>
68+
Task PublishEventAsync(V1Event e, CancellationToken cancellationToken = default);
69+
6270
}
6371

6472
}

src/apps/Synapse.Worker/Services/OAuth2TokenManager.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*
1616
*/
1717

18+
using GraphQL.Client.Abstractions.Utilities;
1819
using IdentityModel.Client;
1920

2021
namespace Synapse.Worker.Services
@@ -68,7 +69,7 @@ public virtual async Task<OAuth2Token> GetTokenAsync(OAuth2AuthenticationPropert
6869
var tokenKey = $"{oauthProperties.ClientId}@{oauthProperties.Authority}";
6970
var json = await this.JsonSerializer.SerializeAsync(oauthProperties, cancellationToken);
7071
var properties = await this.JsonSerializer.DeserializeAsync<Dictionary<string, string>>(json, cancellationToken);
71-
properties = properties.Where(kvp => !string.IsNullOrWhiteSpace(kvp.Value)).ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
72+
properties = properties.Where(kvp => !string.IsNullOrWhiteSpace(kvp.Value)).ToDictionary(kvp => kvp.Key.ToSnakeCase(), kvp => kvp.Value);
7273
properties.Remove("authority");
7374
if(this.Tokens.TryGetValue(tokenKey, out var token)
7475
&& token != null)

src/apps/Synapse.Worker/Services/Processors/ProduceEventProcessor.cs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*
1616
*/
1717

18-
using CloudNative.CloudEvents;
1918
using Synapse.Integration.Events.WorkflowActivities;
2019
using System.Net.Mime;
2120

@@ -67,13 +66,14 @@ protected override async Task ProcessAsync(CancellationToken cancellationToken)
6766
{
6867
try
6968
{
70-
var e = new CloudEvent()
69+
var e = new V1Event()
7170
{
7271
Id = Guid.NewGuid().ToString(),
7372
Type = this.EventDefinition.Type,
7473
Source = new(this.EventDefinition.Source!),
7574
DataContentType = MediaTypeNames.Application.Json,
76-
Data = this.Activity.Input
75+
Data = this.Activity.Input,
76+
ExtensionAttributes = new()
7777
};
7878
if(this.EventDefinition.Correlations != null)
7979
{
@@ -91,10 +91,12 @@ protected override async Task ProcessAsync(CancellationToken cancellationToken)
9191
value = (await this.Context.EvaluateAsync(value, await this.Context.Workflow.GetActivityStateDataAsync(this.Activity, cancellationToken), cancellationToken)!)!.ToString();
9292
await this.Context.Workflow.SetCorrelationMappingAsync(correlation.ContextAttributeName, value!, cancellationToken);
9393
}
94-
e.SetAttributeFromString(correlation.ContextAttributeName, value!);
94+
e.ExtensionAttributes.Add(correlation.ContextAttributeName, Dynamic.FromObject(value));
9595
}
9696
}
97-
this.IntegrationEventBus.OutboundStream.OnNext(e);
97+
98+
await this.Context.PublishEventAsync(e, cancellationToken);
99+
98100
await this.OnNextAsync(new V1WorkflowActivityCompletedIntegrationEvent(this.Activity.Id, this.Activity.Input), cancellationToken);
99101
await this.OnCompletedAsync(cancellationToken);
100102
}

src/apps/Synapse.Worker/Services/WorkflowRuntimeContext.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,13 @@ public virtual async Task<T> GetSecretAsync<T>(string secret, CancellationToken
171171
};
172172
}
173173

174+
/// <inheritdoc/>
175+
public virtual async Task PublishEventAsync(V1Event e, CancellationToken cancellationToken = default)
176+
{
177+
if (e == null) throw new ArgumentNullException(nameof(e));
178+
await this.RuntimeApi.PublishEventAsync(new() { Event = e }, cancellationToken);
179+
}
180+
174181
/// <summary>
175182
/// Builds the runtime expression arguments
176183
/// </summary>

0 commit comments

Comments
 (0)