Skip to content

Commit 4b2b316

Browse files
committed
feat(Runner): Implemented context data inheritance and filtering
Signed-off-by: Charles d'Avernas <charles.davernas@neuroglia.io>
1 parent 6da1e74 commit 4b2b316

22 files changed

+203
-157
lines changed

src/api/Synapse.Api.Client.Core/Services/IDocumentApiClient.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,21 @@ public interface IDocumentApiClient
3636
/// <returns>The <see cref="Document"/> with the specified id</returns>
3737
Task<Document> GetAsync(string id, CancellationToken cancellationToken = default);
3838

39+
/// <summary>
40+
/// Updates the contents of the <see cref="Document"/> with the specified id
41+
/// </summary>
42+
/// <param name="id">The id of the <see cref="Document"/> to update the content of</param>
43+
/// <param name="content">The <see cref="Document"/>'s content</param>
44+
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
45+
/// <returns>The <see cref="Document"/> with the specified id</returns>
46+
Task UpdateAsync(string id, object content, CancellationToken cancellationToken = default);
47+
48+
/// <summary>
49+
/// Deletes the <see cref="Document"/> with the specified id
50+
/// </summary>
51+
/// <param name="id">The id of the <see cref="Document"/> to delete</param>
52+
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
53+
/// <returns>The <see cref="Document"/> with the specified id</returns>
54+
Task DeletesAsync(string id, CancellationToken cancellationToken = default);
55+
3956
}

src/api/Synapse.Api.Client.Http/Services/DocumentHttpApiClient.cs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,26 @@ public virtual async Task<Document> GetAsync(string id, CancellationToken cancel
7373
return this.JsonSerializer.Deserialize<Document>(json)!;
7474
}
7575

76+
/// <inheritdoc/>
77+
public virtual async Task UpdateAsync(string id, object content, CancellationToken cancellationToken = default)
78+
{
79+
ArgumentException.ThrowIfNullOrWhiteSpace(id);
80+
var json = this.JsonSerializer.SerializeToText(content);
81+
using var requestContent = new StringContent(json, Encoding.UTF8, MediaTypeNames.Application.Json);
82+
var uri = $"api/v1/workflow-data/{id}";
83+
using var request = await this.ProcessRequestAsync(new HttpRequestMessage(HttpMethod.Put, uri) { Content = requestContent }, cancellationToken).ConfigureAwait(false);
84+
using var response = await this.ProcessResponseAsync(await this.HttpClient.SendAsync(request, cancellationToken).ConfigureAwait(false), cancellationToken).ConfigureAwait(false);
85+
}
86+
87+
/// <inheritdoc/>
88+
public virtual async Task DeletesAsync(string id, CancellationToken cancellationToken = default)
89+
{
90+
ArgumentException.ThrowIfNullOrWhiteSpace(id);
91+
var uri = $"api/v1/workflow-data/{id}";
92+
using var request = await this.ProcessRequestAsync(new HttpRequestMessage(HttpMethod.Delete, uri), cancellationToken).ConfigureAwait(false);
93+
using var response = await this.ProcessResponseAsync(await this.HttpClient.SendAsync(request, cancellationToken).ConfigureAwait(false), cancellationToken).ConfigureAwait(false);
94+
}
95+
7696
/// <summary>
7797
/// Processes the specified <see cref="HttpRequestMessage"/> before sending it
7898
/// </summary>

src/runner/Synapse.Runner/Services/Executors/ConcurrentCompositeTaskExecutor.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken
6161
? tasks.Where(t => t.IsOperative)
6262
: this.Tasks
6363
.ToAsyncEnumerable()
64-
.SelectAwait(async kvp => await this.Task.Workflow.CreateTaskAsync(kvp.Value, this.GetPathFor(kvp.Key), this.Task.Input, this.Task, false, cancellationToken).ConfigureAwait(false));
64+
.SelectAwait(async kvp => await this.Task.Workflow.CreateTaskAsync(kvp.Value, this.GetPathFor(kvp.Key), this.Task.Input, null, this.Task, false, cancellationToken).ConfigureAwait(false));
6565
await System.Threading.Tasks.Task.WhenAll(await tasks
6666
.SelectAwait(async task =>
6767
{

src/runner/Synapse.Runner/Services/Executors/ForTaskExecutor.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken
7575
return;
7676
}
7777
var item = this.Collection.ElementAt(index);
78-
if (task == null) task = await this.Task.Workflow.CreateTaskAsync(this.Task.Definition.Do, this.GetPathFor("0"), this.Task.Input, this.Task, false, cancellationToken).ConfigureAwait(false);
79-
else if(!task.IsOperative) task = await this.Task.Workflow.CreateTaskAsync(this.Task.Definition.Do, this.GetPathFor($"{index + 1}"), this.Task.Input, this.Task, false, cancellationToken).ConfigureAwait(false);
78+
if (task == null) task = await this.Task.Workflow.CreateTaskAsync(this.Task.Definition.Do, this.GetPathFor("0"), this.Task.Input, null, this.Task, false, cancellationToken).ConfigureAwait(false);
79+
else if(!task.IsOperative) task = await this.Task.Workflow.CreateTaskAsync(this.Task.Definition.Do, this.GetPathFor($"{index + 1}"), this.Task.Input, null, this.Task, false, cancellationToken).ConfigureAwait(false);
8080
var contextData = this.Task.ContextData.Clone()!;
8181
var arguments = this.Task.Arguments.Clone()!;
8282
arguments[this.Task.Definition.For.Each ?? RuntimeExpressions.Arguments.Each] = item;
@@ -124,7 +124,7 @@ protected virtual async Task OnIterationCompletedAsync(ITaskExecutor executor, C
124124
switch (executor.Task.Instance.Next)
125125
{
126126
case FlowDirective.Continue:
127-
var next = await this.Task.Workflow.CreateTaskAsync(this.Task.Definition.Do, this.GetPathFor(index.ToString()), output, this.Task, false, cancellationToken).ConfigureAwait(false);
127+
var next = await this.Task.Workflow.CreateTaskAsync(this.Task.Definition.Do, this.GetPathFor(index.ToString()), output, null, this.Task, false, cancellationToken).ConfigureAwait(false);
128128
var item = this.Collection.ElementAt(index);
129129
var contextData = this.Task.ContextData.Clone()!;
130130
var arguments = this.Task.Arguments.Clone()!;

src/runner/Synapse.Runner/Services/Executors/SequentialCompositeTaskExecutor.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken
6767
return;
6868
}
6969
var input = last == null ? this.Task.Input : (await this.Task.Workflow.Documents.GetAsync(last.OutputReference!, cancellationToken).ConfigureAwait(false))!;
70-
var next = await this.Task.Workflow.CreateTaskAsync(nextDefinition.Value, this.GetPathFor(nextDefinition.Key), input, this.Task, false, cancellationToken).ConfigureAwait(false);
70+
var next = await this.Task.Workflow.CreateTaskAsync(nextDefinition.Value, this.GetPathFor(nextDefinition.Key), input, null, this.Task, false, cancellationToken).ConfigureAwait(false);
7171
var executor = await this.CreateTaskExecutorAsync(next, nextDefinition.Value, this.Task.ContextData, this.Task.Arguments, cancellationToken).ConfigureAwait(false);
7272
await executor.ExecuteAsync(cancellationToken).ConfigureAwait(false);
7373
}
@@ -116,7 +116,7 @@ protected virtual async Task OnSubtaskCompletedAsync(ITaskExecutor executor, Can
116116
await this.SetResultAsync(output, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
117117
break;
118118
default:
119-
next = await this.Task.Workflow.CreateTaskAsync(nextDefinition.Value, this.GetPathFor(nextDefinition.Key), output, this.Task, false, cancellationToken).ConfigureAwait(false);
119+
next = await this.Task.Workflow.CreateTaskAsync(nextDefinition.Value, this.GetPathFor(nextDefinition.Key), output, null, this.Task, false, cancellationToken).ConfigureAwait(false);
120120
var nextExecutor = await this.CreateTaskExecutorAsync(next, nextDefinition.Value, this.Task.ContextData, this.Task.Arguments, cancellationToken).ConfigureAwait(false);
121121
await nextExecutor.ExecuteAsync(cancellationToken).ConfigureAwait(false);
122122
break;

src/runner/Synapse.Runner/Services/Executors/TryTaskExecutor.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public class TryTaskExecutor(IServiceProvider serviceProvider, ILogger<TryTaskEx
3434
/// <inheritdoc/>
3535
protected override async Task DoExecuteAsync(CancellationToken cancellationToken)
3636
{
37-
var task = await this.Task.Workflow.CreateTaskAsync(this.Task.Definition.Try, nameof(this.Task.Definition.Try).ToCamelCase(), this.Task.Input, this.Task, false, cancellationToken).ConfigureAwait(false);
37+
var task = await this.Task.Workflow.CreateTaskAsync(this.Task.Definition.Try, nameof(this.Task.Definition.Try).ToCamelCase(), this.Task.Input, null, this.Task, false, cancellationToken).ConfigureAwait(false);
3838
var executor = await this.CreateTaskExecutorAsync(task, this.Task.Definition.Try, this.Task.ContextData, this.Task.Arguments, cancellationToken).ConfigureAwait(false);
3939
executor.SubscribeAsync
4040
(
@@ -49,7 +49,7 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken
4949
/// <inheritdoc/>
5050
protected override async Task DoRetryAsync(Error cause, CancellationToken cancellationToken)
5151
{
52-
var task = await this.Task.Workflow.CreateTaskAsync(this.Task.Definition.Try, $"retry/{this.Task.Instance.Retries?.Count - 1}", this.Task.Input, this.Task, false, cancellationToken).ConfigureAwait(false);
52+
var task = await this.Task.Workflow.CreateTaskAsync(this.Task.Definition.Try, $"retry/{this.Task.Instance.Retries?.Count - 1}", this.Task.Input, null, this.Task, false, cancellationToken).ConfigureAwait(false);
5353
var executor = await this.CreateTaskExecutorAsync(task, this.Task.Definition.Try, this.Task.ContextData, this.Task.Arguments, cancellationToken).ConfigureAwait(false);
5454
executor.SubscribeAsync
5555
(
@@ -113,7 +113,7 @@ protected virtual async Task OnTryFaultedAsync(ITaskExecutor executor, Exception
113113
}
114114
if (this.Task.Definition.Catch.Do != null)
115115
{
116-
var next = await this.Task.Workflow.CreateTaskAsync(this.Task.Definition.Catch.Do, $"{nameof(this.Task.Definition.Catch).ToCamelCase()}/{nameof(ErrorCatcherDefinition.Do).ToCamelCase()}", this.Task.Input, this.Task, false, cancellationToken).ConfigureAwait(false);
116+
var next = await this.Task.Workflow.CreateTaskAsync(this.Task.Definition.Catch.Do, $"{nameof(this.Task.Definition.Catch).ToCamelCase()}/{nameof(ErrorCatcherDefinition.Do).ToCamelCase()}", this.Task.Input, null, this.Task, false, cancellationToken).ConfigureAwait(false);
117117
var arguments = this.Task.Arguments.Clone()!;
118118
arguments[this.Task.Definition.Catch.As ?? RuntimeExpressions.Arguments.Error] = error;
119119
var nextExecutor = await this.CreateTaskExecutorAsync(next, this.Task.Definition.Catch.Do, this.Task.ContextData, arguments, cancellationToken).ConfigureAwait(false);

src/runner/Synapse.Runner/Services/Interfaces/ITaskExecutionContext.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,14 @@ public interface ITaskExecutionContext
9898
/// <returns>A new awaitable <see cref="Task"/></returns>
9999
Task SetErrorAsync(Error error, CancellationToken cancellationToken = default);
100100

101+
/// <summary>
102+
/// Sets the task's context data
103+
/// </summary>
104+
/// <param name="context">The updated context data</param>
105+
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
106+
/// <returns>A new awaitable <see cref="Task"/></returns>
107+
Task SetContextDataAsync(IDictionary<string, object> context, CancellationToken cancellationToken = default);
108+
101109
/// <summary>
102110
/// Sets the <see cref="TaskInstance"/>'s result, if any
103111
/// </summary>

src/runner/Synapse.Runner/Services/Interfaces/IWorkflowExecutionContext.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,12 @@ public interface IWorkflowExecutionContext
7777
/// <param name="definition">The <see cref="TaskDefinition"/> of the <see cref="TaskInstance"/> to create</param>
7878
/// <param name="path">The path used to reference the <see cref="TaskDefinition"/> of the <see cref="TaskInstance"/> to create</param>
7979
/// <param name="input">The input data, if any</param>
80+
/// <param name="context">The task's context data, if any. If not set, the task will inherit its parent's context data</param>
8081
/// <param name="parent">The parent of the <see cref="TaskInstance"/> to create, if any</param>
8182
/// <param name="isExtension">Indicates whether or not the task is part of an extension</param>
8283
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
8384
/// <returns>The updated <see cref="TaskInstance"/></returns>
84-
Task<TaskInstance> CreateTaskAsync(TaskDefinition definition, string path, object input, ITaskExecutionContext? parent = null, bool isExtension = false, CancellationToken cancellationToken = default);
85+
Task<TaskInstance> CreateTaskAsync(TaskDefinition definition, string path, object input, IDictionary<string, object>? context = null, ITaskExecutionContext? parent = null, bool isExtension = false, CancellationToken cancellationToken = default);
8586

8687
/// <summary>
8788
/// Gets the workflow's tasks
@@ -203,6 +204,15 @@ public interface IWorkflowExecutionContext
203204
/// <returns>The updated <see cref="TaskInstance"/></returns>
204205
Task<TaskInstance> SetResultAsync(TaskInstance task, object? result, string? then = FlowDirective.Continue, CancellationToken cancellationToken = default);
205206

207+
/// <summary>
208+
/// Sets the specified workflow data
209+
/// </summary>
210+
/// <param name="reference">A reference to the workflow data to update</param>
211+
/// <param name="data">The updated workflow data</param>
212+
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
213+
/// <returns>A new awaitable <see cref="Task"/></returns>
214+
Task SetWorkflowDataAsync(string reference, object data, CancellationToken cancellationToken = default);
215+
206216
/// <summary>
207217
/// Cancels the workflow's execution
208218
/// </summary>

src/runner/Synapse.Runner/Services/TaskExecutionContext.cs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,10 @@ public class TaskExecutionContext<TDefinition>(IWorkflowExecutionContext workflo
4141
public virtual object Input => input;
4242

4343
/// <inheritdoc/>
44-
public virtual IDictionary<string, object> ContextData => contextData;
44+
public virtual IDictionary<string, object> ContextData { get; protected set; } = contextData;
4545

4646
/// <inheritdoc/>
47-
public virtual IDictionary<string, object> Arguments => arguments;
47+
public virtual IDictionary<string, object> Arguments { get; protected set; } = arguments;
4848

4949
/// <inheritdoc/>
5050
public virtual object? Output { get; protected set; }
@@ -98,10 +98,19 @@ public virtual async Task SetResultAsync(object? result, string? then = FlowDire
9898
this.Instance = await this.Workflow.SetResultAsync(this.Instance, this.Output, then, cancellationToken).ConfigureAwait(false);
9999
}
100100

101+
/// <inheritdoc/>
102+
public virtual async Task SetContextDataAsync(IDictionary<string, object> context, CancellationToken cancellationToken = default)
103+
{
104+
ArgumentNullException.ThrowIfNull(context);
105+
if (string.IsNullOrWhiteSpace(this.Instance.ContextReference)) throw new NullReferenceException($"The context reference of the task '{this.Instance.Reference}' must be set");
106+
await this.Workflow.SetWorkflowDataAsync(this.Instance.ContextReference, context, cancellationToken).ConfigureAwait(false);
107+
this.ContextData = context;
108+
}
109+
101110
/// <inheritdoc/>
102111
public virtual async Task CancelAsync(CancellationToken cancellationToken = default)
103112
{
104113
this.Instance = await this.Workflow.CancelAsync(this.Instance, cancellationToken);
105-
}
114+
}
106115

107116
}

src/runner/Synapse.Runner/Services/TaskExecutor.cs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ protected virtual async Task BeforeExecuteAsync(CancellationToken cancellationTo
157157
foreach (var extension in this.Extensions.Where(ex => ex.Value.Before != null).Reverse())
158158
{
159159
var taskDefinition = extension.Value.Before!;
160-
var task = await this.Task.Workflow.CreateTaskAsync(taskDefinition, $"before/{extension.Key}", input, this.Task, true, cancellationToken).ConfigureAwait(false);
160+
var task = await this.Task.Workflow.CreateTaskAsync(taskDefinition, $"before/{extension.Key}", input, null, this.Task, true, cancellationToken).ConfigureAwait(false);
161161
var executor = await this.CreateTaskExecutorAsync(task, taskDefinition, this.Task.ContextData, this.Task.Arguments, cancellationToken).ConfigureAwait(false);
162162
await executor.ExecuteAsync(cancellationToken).ConfigureAwait(false);
163163
if (executor.Task.Instance.Next == FlowDirective.Exit)
@@ -189,7 +189,7 @@ protected virtual async Task AfterExecuteAsync(CancellationToken cancellationTok
189189
foreach (var extension in this.Extensions.Where(ex => ex.Value.After != null).Reverse())
190190
{
191191
var taskDefinition = extension.Value.After!;
192-
var task = await this.Task.Workflow.CreateTaskAsync(taskDefinition, $"after/{extension.Key}", output, this.Task, true, cancellationToken).ConfigureAwait(false);
192+
var task = await this.Task.Workflow.CreateTaskAsync(taskDefinition, $"after/{extension.Key}", output, null, this.Task, true, cancellationToken).ConfigureAwait(false);
193193
var executor = await this.CreateTaskExecutorAsync(task, taskDefinition, this.Task.ContextData, this.Task.Arguments, cancellationToken).ConfigureAwait(false);
194194
await executor.ExecuteAsync(cancellationToken).ConfigureAwait(false);
195195
if (executor.Task.Instance.Next == FlowDirective.Exit) break;
@@ -258,10 +258,20 @@ public virtual async Task SetResultAsync(object? result, string? then = FlowDire
258258
this.Stopwatch.Stop();
259259
if (string.IsNullOrWhiteSpace(then)) then = FlowDirective.Continue;
260260
var output = result;
261-
if (this.Task.Definition.Output?.From is string fromExpression) output = await this.Task.Workflow.Expressions.EvaluateAsync<object>(fromExpression, output ?? new(), this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false);
262-
else if (this.Task.Definition.Output?.From != null) output = await this.Task.Workflow.Expressions.EvaluateAsync<object>(this.Task.Definition.Output.From, output ?? new(), this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false);
263-
if (this.Task.Definition.Output?.To is string toExpression) throw new NotImplementedException(); //todo: urgent
264-
else if (this.Task.Definition.Output?.To != null) throw new NotImplementedException(); //todo: urgent
261+
var arguments = this.GetExpressionEvaluationArguments() ?? new Dictionary<string, object>();
262+
arguments["output"] = output!;//todo: replace with arguments[RuntimeExpressions.Arguments.Output] = output;
263+
if (this.Task.Definition.Output?.From is string fromExpression) output = await this.Task.Workflow.Expressions.EvaluateAsync<object>(fromExpression, output ?? new(), arguments, cancellationToken).ConfigureAwait(false);
264+
else if (this.Task.Definition.Output?.From != null) output = await this.Task.Workflow.Expressions.EvaluateAsync<object>(this.Task.Definition.Output.From, output ?? new(), arguments, cancellationToken).ConfigureAwait(false);
265+
if (this.Task.Definition.Output?.To is string toExpression)
266+
{
267+
var context = (await this.Task.Workflow.Expressions.EvaluateAsync<IDictionary<string, object>>(toExpression, this.Task.ContextData, arguments, cancellationToken).ConfigureAwait(false))!;
268+
await this.Task.SetContextDataAsync(context, cancellationToken).ConfigureAwait(false);
269+
}
270+
else if (this.Task.Definition.Output?.To != null)
271+
{
272+
var context = (await this.Task.Workflow.Expressions.EvaluateAsync<IDictionary<string, object>>(this.Task.Definition.Output.To, this.Task.ContextData, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false))!;
273+
await this.Task.SetContextDataAsync(context, cancellationToken).ConfigureAwait(false);
274+
}
265275
await this.AfterExecuteAsync(cancellationToken).ConfigureAwait(false); //todo: act upon last directive
266276
await this.DoSetResultAsync(output, then, cancellationToken).ConfigureAwait(false);
267277
await this.Task.SetResultAsync(output, then, cancellationToken).ConfigureAwait(false);

0 commit comments

Comments
 (0)