Skip to content

.NET Agents - Add streaming support to orchestrations #12455

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jun 16, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,21 @@ namespace GettingStarted.Orchestration;
/// </summary>
public class Step01_Concurrent(ITestOutputHelper output) : BaseOrchestrationTest(output)
{
[Fact]
public async Task ConcurrentTaskAsync()
[Theory]
[InlineData(false)]
[InlineData(true)]
public async Task ConcurrentTaskAsync(bool streamedResponse)
{
// Define the agents
ChatCompletionAgent physicist =
this.CreateAgent(
instructions: "You are an expert in physics. You answer questions from a physics perspective.",
name: "Physicist",
description: "An expert in physics");
ChatCompletionAgent chemist =
this.CreateAgent(
instructions: "You are an expert in chemistry. You answer questions from a chemistry perspective.",
name: "Chemist",
description: "An expert in chemistry");

// Create a monitor to capturing agent responses (via ResponseCallback)
Expand All @@ -36,8 +40,9 @@ public async Task ConcurrentTaskAsync()
ConcurrentOrchestration orchestration =
new(physicist, chemist)
{
ResponseCallback = monitor.ResponseCallback,
LoggerFactory = this.LoggerFactory,
ResponseCallback = monitor.ResponseCallback,
StreamingResponseCallback = streamedResponse ? monitor.StreamingResultCallback : null,
};

// Start the runtime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ namespace GettingStarted.Orchestration;
/// </summary>
public class Step02_Sequential(ITestOutputHelper output) : BaseOrchestrationTest(output)
{
[Fact]
public async Task SequentialTaskAsync()
[Theory]
[InlineData(false)]
[InlineData(true)]
public async Task SequentialTaskAsync(bool streamedResponse)
{
// Define the agents
ChatCompletionAgent analystAgent =
Expand Down Expand Up @@ -58,8 +60,9 @@ give format and make it polished. Output the final improved copy as a single tex
SequentialOrchestration orchestration =
new(analystAgent, writerAgent, editorAgent)
{
LoggerFactory = this.LoggerFactory,
ResponseCallback = monitor.ResponseCallback,
LoggerFactory = this.LoggerFactory
StreamingResponseCallback = streamedResponse ? monitor.StreamingResultCallback : null,
};

// Start the runtime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ namespace GettingStarted.Orchestration;
/// </remarks>
public class Step03_GroupChat(ITestOutputHelper output) : BaseOrchestrationTest(output)
{
[Fact]
public async Task GroupChatAsync()
[Theory]
[InlineData(false)]
[InlineData(true)]
public async Task GroupChatAsync(bool streamedResponse)
{
// Define the agents
ChatCompletionAgent writer =
Expand Down Expand Up @@ -62,8 +64,9 @@ Consider suggestions when refining an idea.
writer,
editor)
{
ResponseCallback = monitor.ResponseCallback,
LoggerFactory = this.LoggerFactory,
ResponseCallback = monitor.ResponseCallback,
StreamingResponseCallback = streamedResponse ? monitor.StreamingResultCallback : null,
};

// Start the runtime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ Consider suggestions when refining an idea.
If not, provide insight on how to refine suggested copy without example.
""");

// Create a monitor to capturing agent responses (via ResponseCallback)
// to display at the end of this sample. (optional)
// NOTE: Create your own callback to capture responses in your application or service.
OrchestrationMonitor monitor = new();

// Define the orchestration
GroupChatOrchestration orchestration =
new(
Expand All @@ -59,7 +64,8 @@ Consider suggestions when refining an idea.
writer,
editor)
{
LoggerFactory = this.LoggerFactory
LoggerFactory = this.LoggerFactory,
ResponseCallback = monitor.ResponseCallback,
};

// Start the runtime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ You are in a debate. Feel free to challenge the other participants with respect.
You are in a debate. Feel free to challenge the other participants with respect.
""");

// Create a monitor to capturing agent responses (via ResponseCallback)
// to display at the end of this sample. (optional)
// NOTE: Create your own callback to capture responses in your application or service.
OrchestrationMonitor monitor = new();

// Define the orchestration
const string topic = "What does a good life mean to you personally?";
Kernel kernel = this.CreateKernelWithChatCompletion();
Expand All @@ -127,7 +132,8 @@ You are in a debate. Feel free to challenge the other participants with respect.
immigrant,
doctor)
{
LoggerFactory = this.LoggerFactory
LoggerFactory = this.LoggerFactory,
ResponseCallback = monitor.ResponseCallback,
};

// Start the runtime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ namespace GettingStarted.Orchestration;
/// </summary>
public class Step04_Handoff(ITestOutputHelper output) : BaseOrchestrationTest(output)
{
[Fact]
public async Task OrderSupportAsync()
[Theory]
[InlineData(false)]
[InlineData(true)]
public async Task OrderSupportAsync(bool streamedResponse)
{
// Define the agents & tools
ChatCompletionAgent triageAgent =
Expand Down Expand Up @@ -76,8 +78,9 @@ public async Task OrderSupportAsync()
Console.WriteLine($"\n# INPUT: {input}\n");
return ValueTask.FromResult(new ChatMessageContent(AuthorRole.User, input));
},
LoggerFactory = this.LoggerFactory,
ResponseCallback = monitor.ResponseCallback,
LoggerFactory = this.LoggerFactory
StreamingResponseCallback = streamedResponse ? monitor.StreamingResultCallback : null,
};

// Start the runtime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ public async Task HandoffStructuredInputAsync()
description: "An agent that handles .NET related issues");
dotnetAgent.Kernel.Plugins.Add(plugin);

// Create a monitor to capturing agent responses (via ResponseCallback)
// to display at the end of this sample. (optional)
// NOTE: Create your own callback to capture responses in your application or service.
OrchestrationMonitor monitor = new();

// Define the orchestration
HandoffOrchestration<GithubIssue, string> orchestration =
new(OrchestrationHandoffs
Expand All @@ -49,7 +54,8 @@ public async Task HandoffStructuredInputAsync()
pythonAgent,
dotnetAgent)
{
LoggerFactory = this.LoggerFactory
LoggerFactory = this.LoggerFactory,
ResponseCallback = monitor.ResponseCallback,
};

GithubIssue input =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ public class Step05_Magentic(ITestOutputHelper output) : BaseOrchestrationTest(o
/// </summary>
protected override bool ForceOpenAI => true;

[Fact]
public async Task MagenticTaskAsync()
[Theory]
[InlineData(false)]
[InlineData(true)]
public async Task MagenticTaskAsync(bool streamedResponse)
{
// Define the agents
Kernel researchKernel = CreateKernelWithOpenAIChatCompletion(ResearcherModel);
Expand Down Expand Up @@ -64,8 +66,9 @@ await agentsClient.Administration.CreateAgentAsync(
MagenticOrchestration orchestration =
new(manager, researchAgent, coderAgent)
{
ResponseCallback = monitor.ResponseCallback,
LoggerFactory = this.LoggerFactory,
ResponseCallback = monitor.ResponseCallback,
StreamingResponseCallback = streamedResponse ? monitor.StreamingResultCallback : null,
};

// Start the runtime
Expand Down
95 changes: 58 additions & 37 deletions dotnet/src/Agents/Orchestration/AgentActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -51,10 +50,7 @@ protected AgentActor(AgentId id, IAgentRuntime runtime, OrchestrationContext con
/// <summary>
/// Optionally overridden to create custom invocation options for the agent.
/// </summary>
protected virtual AgentInvokeOptions? CreateInvokeOptions()
{
return null;
}
protected virtual AgentInvokeOptions CreateInvokeOptions(Func<ChatMessageContent, Task> messageHandler) => new() { OnIntermediateMessage = messageHandler };

/// <summary>
/// Optionally overridden to introduce customer filtering logic for the response callback.
Expand Down Expand Up @@ -89,8 +85,7 @@ protected ValueTask<ChatMessageContent> InvokeAsync(ChatMessageContent input, Ca
}

/// <summary>
/// Invokes the agent with multiple chat messages.
/// Processes the response items and consolidates the messages into a single <see cref="ChatMessageContent"/>.
/// Invokes the agent with input messages and respond with both streamed and regular messages.
/// </summary>
/// <param name="input">The list of chat messages to send.</param>
/// <param name="cancellationToken">A cancellation token that can be used to cancel the operation.</param>
Expand All @@ -99,51 +94,77 @@ protected async ValueTask<ChatMessageContent> InvokeAsync(IList<ChatMessageConte
{
this.Context.Cancellation.ThrowIfCancellationRequested();

AgentResponseItem<ChatMessageContent>[] responses =
await this.Agent.InvokeAsync(
input,
this.Thread,
this.GetInvokeOptions(),
cancellationToken).ToArrayAsync(cancellationToken).ConfigureAwait(false);

AgentResponseItem<ChatMessageContent>? firstResponse = responses.FirstOrDefault();
this.Thread ??= firstResponse?.Thread;
ChatMessageContent? response = null;

// The vast majority of responses will be a single message. Responses with multiple messages will have their content merged.
ChatMessageContent response = new(firstResponse?.Message.Role ?? AuthorRole.Assistant, string.Join("\n\n", responses.Select(response => response.Message)))
AgentInvokeOptions options = this.GetInvokeOptions(HandleMessage);
if (this.Context.StreamingResponseCallback == null)
{
// No need to utilize streaming if no callback is provided
await this.InvokeAsync(input, options, cancellationToken).ConfigureAwait(false);
}
else
{
AuthorName = firstResponse?.Message.AuthorName,
};
await this.InvokeStreamingAsync(input, options, cancellationToken).ConfigureAwait(false);
}

return response ?? new ChatMessageContent(AuthorRole.Assistant, string.Empty);

if (this.Context.ResponseCallback is not null && !this.ResponseCallbackFilter(response))
async Task HandleMessage(ChatMessageContent message)
{
await this.Context.ResponseCallback.Invoke(response).ConfigureAwait(false);
response = message; // Keep track of most recent response for both invocation modes

if (this.Context.ResponseCallback is not null && !this.ResponseCallbackFilter(message))
{
await this.Context.ResponseCallback.Invoke(message).ConfigureAwait(false);
}
}
}

return response;
private async Task InvokeAsync(IList<ChatMessageContent> input, AgentInvokeOptions options, CancellationToken cancellationToken)
{
AgentResponseItem<ChatMessageContent>? lastResponse =
await this.Agent.InvokeAsync(
input,
this.Thread,
options,
cancellationToken).LastOrDefaultAsync(cancellationToken).ConfigureAwait(false);

this.Thread ??= lastResponse?.Thread;
}

/// <summary>
/// Invokes the agent and streams chat message responses asynchronously.
/// Yields each streaming message as it becomes available.
/// </summary>
/// <param name="input">The chat message content to send.</param>
/// <param name="cancellationToken">A cancellation token that can be used to cancel the stream.</param>
/// <returns>An asynchronous stream of <see cref="StreamingChatMessageContent"/> responses.</returns>
protected async IAsyncEnumerable<StreamingChatMessageContent> InvokeStreamingAsync(ChatMessageContent input, [EnumeratorCancellation] CancellationToken cancellationToken)
private async Task InvokeStreamingAsync(IList<ChatMessageContent> input, AgentInvokeOptions options, CancellationToken cancellationToken)
{
this.Context.Cancellation.ThrowIfCancellationRequested();
IAsyncEnumerable<AgentResponseItem<StreamingChatMessageContent>> streamedResponses =
this.Agent.InvokeStreamingAsync(
input,
this.Thread,
options,
cancellationToken);

StreamingChatMessageContent? lastStreamedResponse = null;
await foreach (AgentResponseItem<StreamingChatMessageContent> streamedResponse in streamedResponses.ConfigureAwait(false))
{
this.Context.Cancellation.ThrowIfCancellationRequested();

this.Thread ??= streamedResponse.Thread;

await HandleStreamedMessage(lastStreamedResponse, isFinal: false).ConfigureAwait(false);

lastStreamedResponse = streamedResponse.Message;
}

var responseStream = this.Agent.InvokeStreamingAsync([input], this.Thread, this.GetInvokeOptions(), cancellationToken);
await HandleStreamedMessage(lastStreamedResponse, isFinal: true).ConfigureAwait(false);

await foreach (AgentResponseItem<StreamingChatMessageContent> response in responseStream.ConfigureAwait(false))
async ValueTask HandleStreamedMessage(StreamingChatMessageContent? streamedResponse, bool isFinal)
{
this.Thread ??= response.Thread;
yield return response.Message;
if (this.Context.StreamingResponseCallback != null && streamedResponse != null)
{
await this.Context.StreamingResponseCallback.Invoke(streamedResponse, isFinal).ConfigureAwait(false);
}
}
}

private AgentInvokeOptions? GetInvokeOptions() => this._options ??= this.CreateInvokeOptions();
private AgentInvokeOptions GetInvokeOptions(Func<ChatMessageContent, Task> messageHandler) => this._options ??= this.CreateInvokeOptions(messageHandler);

private static string VerifyDescription(Agent agent)
{
Expand Down
20 changes: 19 additions & 1 deletion dotnet/src/Agents/Orchestration/AgentOrchestration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ namespace Microsoft.SemanticKernel.Agents.Orchestration;
/// <param name="response">The agent response</param>
public delegate ValueTask OrchestrationResponseCallback(ChatMessageContent response);

/// <summary>
/// Called to expose the streamed response produced by any agent.
/// </summary>
/// <param name="response">The agent response</param>
/// <param name="isFinal">Indicates if streamed content is final chunk of the message.</param>
public delegate ValueTask OrchestrationStreamingCallback(StreamingChatMessageContent response, bool isFinal);

/// <summary>
/// Called when human interaction is requested.
/// </summary>
Expand Down Expand Up @@ -74,6 +81,11 @@ protected AgentOrchestration(params Agent[] members)
/// </summary>
public OrchestrationResponseCallback? ResponseCallback { get; init; }

/// <summary>
/// Optional callback that is invoked for every agent response.
/// </summary>
public OrchestrationStreamingCallback? StreamingResponseCallback { get; init; }

/// <summary>
/// Gets the list of member targets involved in the orchestration.
/// </summary>
Expand Down Expand Up @@ -102,7 +114,13 @@ public async ValueTask<OrchestrationResult<TOutput>> InvokeAsync(

CancellationTokenSource orchestrationCancelSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

OrchestrationContext context = new(this.OrchestrationLabel, topic, this.ResponseCallback, this.LoggerFactory, cancellationToken);
OrchestrationContext context =
new(this.OrchestrationLabel,
topic,
this.ResponseCallback,
this.StreamingResponseCallback,
this.LoggerFactory,
cancellationToken);

ILogger logger = this.LoggerFactory.CreateLogger(this.GetType());

Expand Down
5 changes: 3 additions & 2 deletions dotnet/src/Agents/Orchestration/Handoff/HandoffActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public HandoffActor(AgentId id, IAgentRuntime runtime, OrchestrationContext cont
protected override bool ResponseCallbackFilter(ChatMessageContent response) => response.Role == AuthorRole.Tool;

/// <inheritdoc/>
protected override AgentInvokeOptions? CreateInvokeOptions()
protected override AgentInvokeOptions CreateInvokeOptions(Func<ChatMessageContent, Task> messageHandler)
{
// Clone kernel to avoid modifying the original
Kernel kernel = this.Agent.Kernel.Clone();
Expand All @@ -71,7 +71,8 @@ public HandoffActor(AgentId id, IAgentRuntime runtime, OrchestrationContext cont
new()
{
Kernel = kernel,
KernelArguments = new(new PromptExecutionSettings { FunctionChoiceBehavior = FunctionChoiceBehavior.Auto() })
KernelArguments = new(new PromptExecutionSettings { FunctionChoiceBehavior = FunctionChoiceBehavior.Auto() }),
OnIntermediateMessage = messageHandler,
};

return options;
Expand Down
Loading
Loading