1
+ // Copyright © 2024-Present The Synapse Authors
2
+ //
3
+ // Licensed under the Apache License, Version 2.0 (the "License"),
4
+ // you may not use this file except in compliance with the License.
5
+ // You may obtain a copy of the License at
6
+ // http://www.apache.org/licenses/LICENSE-2.0
7
+ //
8
+ // Unless required by applicable law or agreed to in writing, software
9
+ // distributed under the License is distributed on an "AS IS" BASIS,
10
+ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
+ // See the License for the specific language governing permissions and
12
+ // limitations under the License.
13
+
14
+ using Neuroglia ;
15
+ using Neuroglia . AsyncApi ;
16
+ using Neuroglia . AsyncApi . Client ;
17
+ using Neuroglia . AsyncApi . Client . Services ;
18
+ using Neuroglia . AsyncApi . IO ;
19
+ using Neuroglia . AsyncApi . v3 ;
20
+ using Neuroglia . Data . Expressions ;
21
+
22
+ namespace Synapse . Runner . Services . Executors ;
23
+
24
+ /// <summary>
25
+ /// Represents an <see cref="ITaskExecutor"/> used to execute AsyncAPI <see cref="CallTaskDefinition"/>s using an <see cref="HttpClient"/>
26
+ /// </summary>
27
+ /// <param name="serviceProvider">The current <see cref="IServiceProvider"/></param>
28
+ /// <param name="logger">The service used to perform logging</param>
29
+ /// <param name="executionContextFactory">The service used to create <see cref="ITaskExecutionContext"/>s</param>
30
+ /// <param name="executorFactory">The service used to create <see cref="ITaskExecutor"/>s</param>
31
+ /// <param name="context">The current <see cref="ITaskExecutionContext"/></param>
32
+ /// <param name="schemaHandlerProvider">The service used to provide <see cref="Core.Infrastructure.Services.ISchemaHandler"/> implementations</param>
33
+ /// <param name="serializer">The service used to serialize/deserialize objects to/from JSON</param>
34
+ /// <param name="httpClientFactory">The service used to create <see cref="HttpClient"/>s</param>
35
+ /// <param name="asyncApiDocumentReader">The service used to read <see cref="IAsyncApiDocument"/>s</param>
36
+ /// <param name="asyncApiClientFactory">The service used to create <see cref="IAsyncApiClient"/>s</param>
37
+ public class AsyncApiCallExecutor ( IServiceProvider serviceProvider , ILogger < AsyncApiCallExecutor > logger , ITaskExecutionContextFactory executionContextFactory , ITaskExecutorFactory executorFactory ,
38
+ ITaskExecutionContext < CallTaskDefinition > context , Core . Infrastructure . Services . ISchemaHandlerProvider schemaHandlerProvider , IJsonSerializer serializer , IHttpClientFactory httpClientFactory , IAsyncApiDocumentReader asyncApiDocumentReader , IAsyncApiClientFactory asyncApiClientFactory )
39
+ : TaskExecutor < CallTaskDefinition > ( serviceProvider , logger , executionContextFactory , executorFactory , context , schemaHandlerProvider , serializer )
40
+ {
41
+
42
+ /// <summary>
43
+ /// Gets the service used to create <see cref="HttpClient"/>s
44
+ /// </summary>
45
+ protected IHttpClientFactory HttpClientFactory { get ; } = httpClientFactory ;
46
+
47
+ /// <summary>
48
+ /// Gets the service used to read <see cref="IAsyncApiDocument"/>s
49
+ /// </summary>
50
+ protected IAsyncApiDocumentReader AsyncApiDocumentReader { get ; } = asyncApiDocumentReader ;
51
+
52
+ /// <summary>
53
+ /// Gets the service used to create <see cref="IAsyncApiClient"/>s
54
+ /// </summary>
55
+ protected IAsyncApiClientFactory AsyncApiClientFactory { get ; } = asyncApiClientFactory ;
56
+
57
+ /// <summary>
58
+ /// Gets the definition of the AsyncAPI call to perform
59
+ /// </summary>
60
+ protected AsyncApiCallDefinition ? AsyncApi { get ; set ; }
61
+
62
+ /// <summary>
63
+ /// Gets/sets the <see cref="IAsyncApiDocument"/> that defines the AsyncAPI operation to call
64
+ /// </summary>
65
+ protected V3AsyncApiDocument ? Document { get ; set ; }
66
+
67
+ /// <summary>
68
+ /// Gets the <see cref="V3OperationDefinition"/> to call
69
+ /// </summary>
70
+ protected KeyValuePair < string , V3OperationDefinition > Operation { get ; set ; }
71
+
72
+ /// <summary>
73
+ /// Gets an object used to describe the credentials, if any, used to authenticate a user agent with the AsyncAPI application
74
+ /// </summary>
75
+ protected AuthorizationInfo ? Authorization { get ; set ; }
76
+
77
+ /// <summary>
78
+ /// Gets/sets the payload, if any, of the message to publish, in case the <see cref="Operation"/>'s <see cref="V3OperationDefinition.Action"/> has been set to <see cref="V3OperationAction.Receive"/>
79
+ /// </summary>
80
+ protected object ? MessagePayload { get ; set ; }
81
+
82
+ /// <summary>
83
+ /// Gets/sets the headers, if any, of the message to publish, in case the <see cref="Operation"/>'s <see cref="V3OperationDefinition.Action"/> has been set to <see cref="V3OperationAction.Receive"/>
84
+ /// </summary>
85
+ protected object ? MessageHeaders { get ; set ; }
86
+
87
+ /// <inheritdoc/>
88
+ protected override async Task DoInitializeAsync ( CancellationToken cancellationToken )
89
+ {
90
+ this . AsyncApi = ( AsyncApiCallDefinition ) this . JsonSerializer . Convert ( this . Task . Definition . With , typeof ( AsyncApiCallDefinition ) ) ! ;
91
+ using var httpClient = this . HttpClientFactory . CreateClient ( ) ;
92
+ await httpClient . ConfigureAuthenticationAsync ( this . AsyncApi . Document . Endpoint . Authentication , this . ServiceProvider , this . Task . Workflow . Definition , cancellationToken ) . ConfigureAwait ( false ) ;
93
+ var uriString = StringFormatter . NamedFormat ( this . AsyncApi . Document . EndpointUri . OriginalString , this . Task . Input . ToDictionary ( ) ) ;
94
+ if ( uriString . IsRuntimeExpression ( ) ) uriString = await this . Task . Workflow . Expressions . EvaluateAsync < string > ( uriString , this . Task . Input , this . GetExpressionEvaluationArguments ( ) , cancellationToken ) . ConfigureAwait ( false ) ;
95
+ if ( string . IsNullOrWhiteSpace ( uriString ) ) throw new NullReferenceException ( "The AsyncAPI endpoint URI cannot be null or empty" ) ;
96
+ if ( ! Uri . TryCreate ( uriString , UriKind . RelativeOrAbsolute , out var uri ) || uri == null ) throw new Exception ( $ "Failed to parse the specified string '{ uriString } ' into a new URI") ;
97
+ using var request = new HttpRequestMessage ( HttpMethod . Get , uriString ) ;
98
+ using var response = await httpClient . SendAsync ( request , cancellationToken ) . ConfigureAwait ( false ) ;
99
+ if ( ! response . IsSuccessStatusCode )
100
+ {
101
+ var responseContent = await response . Content . ReadAsStringAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
102
+ this . Logger . LogInformation ( "Failed to retrieve the AsyncAPI document at location '{uri}'. The remote server responded with a non-success status code '{statusCode}'." , uri , response . StatusCode ) ;
103
+ this . Logger . LogDebug ( "Response content:\r \n {responseContent}" , responseContent ?? "None" ) ;
104
+ response . EnsureSuccessStatusCode ( ) ;
105
+ }
106
+ using var responseStream = await response . Content ! . ReadAsStreamAsync ( cancellationToken ) ! ;
107
+ var document = await this . AsyncApiDocumentReader . ReadAsync ( responseStream , cancellationToken ) . ConfigureAwait ( false ) ;
108
+ if ( document is not V3AsyncApiDocument v3Document ) throw new NotSupportedException ( "Synapse only supports AsyncAPI v3.0.0 at the moment" ) ;
109
+ this . Document = v3Document ;
110
+ var operationId = this . AsyncApi . OperationRef ;
111
+ if ( operationId . IsRuntimeExpression ( ) ) operationId = await this . Task . Workflow . Expressions . EvaluateAsync < string > ( operationId , this . Task . Input , this . GetExpressionEvaluationArguments ( ) , cancellationToken ) . ConfigureAwait ( false ) ;
112
+ if ( string . IsNullOrWhiteSpace ( operationId ) ) throw new NullReferenceException ( "The operation ref cannot be null or empty" ) ;
113
+ var operation = this . Document . Operations . FirstOrDefault ( o => o . Key == operationId ) ;
114
+ if ( operation . Value == null ) throw new NullReferenceException ( $ "Failed to find an operation with id '{ operationId } ' in AsyncAPI document at '{ uri } '") ;
115
+ if ( this . AsyncApi . Authentication != null ) this . Authorization = await AuthorizationInfo . CreateAsync ( this . AsyncApi . Authentication , this . ServiceProvider , this . Task . Workflow . Definition , cancellationToken ) . ConfigureAwait ( false ) ;
116
+ switch ( this . Operation . Value . Action )
117
+ {
118
+ case V3OperationAction . Receive :
119
+ await this . BuildMessagePayloadAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
120
+ await this . BuildMessageHeadersAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
121
+ break ;
122
+ case V3OperationAction . Send :
123
+
124
+ break ;
125
+ default :
126
+ throw new NotSupportedException ( $ "The specified operation action '{ this . Operation . Value . Action } ' is not supported") ;
127
+ }
128
+ }
129
+
130
+ /// <summary>
131
+ /// Builds the payload, if any, of the message to publish, in case the <see cref="Operation"/>'s <see cref="V3OperationDefinition.Action"/> has been set to <see cref="V3OperationAction.Receive"/>
132
+ /// </summary>
133
+ /// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
134
+ /// <returns>A new awaitable <see cref="Task"/></returns>
135
+ protected virtual async Task BuildMessagePayloadAsync ( CancellationToken cancellationToken = default )
136
+ {
137
+ if ( this . AsyncApi == null || this . Operation == null ) throw new InvalidOperationException ( "The executor must be initialized before execution" ) ;
138
+ if ( this . Task . Input == null ) this . MessagePayload = new { } ;
139
+ if ( this . AsyncApi . Payload == null ) return ;
140
+ var arguments = this . GetExpressionEvaluationArguments ( ) ;
141
+ if ( this . Authorization != null )
142
+ {
143
+ arguments ??= new Dictionary < string , object > ( ) ;
144
+ arguments . Add ( "authorization" , this . Authorization ) ;
145
+ }
146
+ this . MessagePayload = await this . Task . Workflow . Expressions . EvaluateAsync < object > ( this . AsyncApi . Payload , this . Task . Input ! , arguments , cancellationToken ) . ConfigureAwait ( false ) ;
147
+ }
148
+
149
+ /// <summary>
150
+ /// Builds the headers, if any, of the message to publish, in case the <see cref="Operation"/>'s <see cref="V3OperationDefinition.Action"/> has been set to <see cref="V3OperationAction.Receive"/>
151
+ /// </summary>
152
+ /// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
153
+ /// <returns>A new awaitable <see cref="Task"/></returns>
154
+ protected virtual async Task BuildMessageHeadersAsync ( CancellationToken cancellationToken = default )
155
+ {
156
+ if ( this . AsyncApi == null || this . Operation == null ) throw new InvalidOperationException ( "The executor must be initialized before execution" ) ;
157
+ if ( this . AsyncApi . Headers == null ) return ;
158
+ var arguments = this . GetExpressionEvaluationArguments ( ) ;
159
+ if ( this . Authorization != null )
160
+ {
161
+ arguments ??= new Dictionary < string , object > ( ) ;
162
+ arguments . Add ( "authorization" , this . Authorization ) ;
163
+ }
164
+ this . MessageHeaders = await this . Task . Workflow . Expressions . EvaluateAsync < object > ( this . AsyncApi . Headers , this . Task . Input ! , arguments , cancellationToken ) . ConfigureAwait ( false ) ;
165
+ }
166
+
167
+ /// <inheritdoc/>
168
+ protected override Task DoExecuteAsync ( CancellationToken cancellationToken )
169
+ {
170
+ if ( this . AsyncApi == null || this . Document == null || this . Operation . Value == null ) throw new InvalidOperationException ( "The executor must be initialized before execution" ) ;
171
+ switch ( this . Operation . Value . Action )
172
+ {
173
+ case V3OperationAction . Receive :
174
+ return this . DoExecutePublishOperationAsync ( cancellationToken ) ;
175
+ case V3OperationAction . Send :
176
+ return this . DoExecuteSubscribeOperationAsync ( cancellationToken ) ;
177
+ default :
178
+ throw new NotSupportedException ( $ "The specified operation action '{ this . Operation . Value . Action } ' is not supported") ;
179
+ }
180
+ }
181
+
182
+ /// <summary>
183
+ /// Executes an AsyncAPI publish operation
184
+ /// </summary>
185
+ /// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
186
+ /// <returns>A new awaitable <see cref="Task"/></returns>
187
+ protected virtual async Task DoExecutePublishOperationAsync ( CancellationToken cancellationToken )
188
+ {
189
+ if ( this . AsyncApi == null || this . Document == null || this . Operation . Value == null ) throw new InvalidOperationException ( "The executor must be initialized before execution" ) ;
190
+ await using var asyncApiClient = this . AsyncApiClientFactory . CreateFor ( this . Document ) ;
191
+ var parameters = new AsyncApiPublishOperationParameters ( this . Operation . Key , this . AsyncApi . Server , this . AsyncApi . Protocol )
192
+ {
193
+ Payload = this . MessagePayload ,
194
+ Headers = this . MessageHeaders
195
+ } ;
196
+ await using var result = await asyncApiClient . PublishAsync ( parameters , cancellationToken ) . ConfigureAwait ( false ) ;
197
+ if ( ! result . IsSuccessful ) throw new Exception ( "Failed to execute the AsyncAPI publish operation" ) ;
198
+ }
199
+
200
+ /// <summary>
201
+ /// Executes an AsyncAPI subscribe operation
202
+ /// </summary>
203
+ /// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
204
+ /// <returns>A new awaitable <see cref="Task"/></returns>
205
+ protected virtual async Task DoExecuteSubscribeOperationAsync ( CancellationToken cancellationToken )
206
+ {
207
+ if ( this . AsyncApi == null || this . Document == null || this . Operation . Value == null ) throw new InvalidOperationException ( "The executor must be initialized before execution" ) ;
208
+ await using var asyncApiClient = this . AsyncApiClientFactory . CreateFor ( this . Document ) ;
209
+ var parameters = new AsyncApiSubscribeOperationParameters ( this . Operation . Key , this . AsyncApi . Server , this . AsyncApi . Protocol ) ;
210
+ await using var result = await asyncApiClient . SubscribeAsync ( parameters , cancellationToken ) . ConfigureAwait ( false ) ;
211
+ if ( ! result . IsSuccessful ) throw new Exception ( "Failed to execute the AsyncAPI subscribe operation" ) ;
212
+ }
213
+
214
+ }
0 commit comments