Skip to content

.NET Agents - Switch all instance of "SendMessage" to "PublishMessage" #12457

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions dotnet/src/Agents/Magentic/MagenticManagerActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private async ValueTask ManageAsync(CancellationToken cancellationToken)
if (status.IsTaskComplete)
{
ChatMessageContent finalAnswer = await this._manager.PrepareFinalAnswerAsync(context, cancellationToken).ConfigureAwait(false);
await this.SendMessageAsync(finalAnswer.AsResultMessage(), this._orchestrationType, cancellationToken).ConfigureAwait(false);
await this.PublishMessageAsync(finalAnswer.AsResultMessage(), this._orchestrationType, cancellationToken).ConfigureAwait(false);
break;
}

Expand Down Expand Up @@ -142,14 +142,14 @@ private async ValueTask ManageAsync(CancellationToken cancellationToken)

if (this._invocationCount >= this._manager.MaximumInvocationCount)
{
await this.SendMessageAsync("Maximum number of invocations reached.".AsResultMessage(), this._orchestrationType, cancellationToken).ConfigureAwait(false);
await this.PublishMessageAsync("Maximum number of invocations reached.".AsResultMessage(), this._orchestrationType, cancellationToken).ConfigureAwait(false);
break;
}

ChatMessageContent instruction = new(AuthorRole.Assistant, agentInstruction);
this._chat.Add(instruction);
await this.PublishMessageAsync(instruction.AsGroupMessage(), this.Context.Topic, messageId: null, cancellationToken).ConfigureAwait(false);
await this.SendMessageAsync(new MagenticMessages.Speak(), agent.Type, cancellationToken).ConfigureAwait(false);
await this.PublishMessageAsync(new MagenticMessages.Speak(), agent.Type, cancellationToken).ConfigureAwait(false);
break;
}

Expand All @@ -158,7 +158,7 @@ private async ValueTask ManageAsync(CancellationToken cancellationToken)
if (this._retryCount >= this._manager.MaximumResetCount)
{
this.Logger.LogMagenticManagerTaskFailed(this.Context.Topic);
await this.SendMessageAsync("I've experienced multiple failures and am unable to continue.".AsResultMessage(), this._orchestrationType, cancellationToken).ConfigureAwait(false);
await this.PublishMessageAsync("I've experienced multiple failures and am unable to continue.".AsResultMessage(), this._orchestrationType, cancellationToken).ConfigureAwait(false);
break;
}

Expand Down
6 changes: 3 additions & 3 deletions dotnet/src/Agents/Magentic/MagenticOrchestration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ protected override ValueTask StartAsync(IAgentRuntime runtime, TopicId topic, IE
{
throw new ArgumentException("Entry agent is not defined.", nameof(entryAgent));
}
return runtime.SendMessageAsync(input.AsInputTaskMessage(), entryAgent.Value);
return runtime.PublishMessageAsync(input.AsInputTaskMessage(), entryAgent.Value);
}

/// <inheritdoc />
Expand All @@ -65,7 +65,7 @@ protected override ValueTask StartAsync(IAgentRuntime runtime, TopicId topic, IE
}

AgentType managerType =
await runtime.RegisterAgentFactoryAsync(
await runtime.RegisterOrchestrationAgentAsync(
this.FormatAgentType(context.Topic, "Manager"),
(agentId, runtime) =>
{
Expand All @@ -83,7 +83,7 @@ await runtime.RegisterAgentFactoryAsync(
return managerType;

ValueTask<AgentType> RegisterAgentAsync(Agent agent, int agentCount) =>
runtime.RegisterAgentFactoryAsync(
runtime.RegisterOrchestrationAgentAsync(
this.FormatAgentType(context.Topic, $"Agent_{agentCount}"),
(agentId, runtime) =>
{
Expand Down
10 changes: 6 additions & 4 deletions dotnet/src/Agents/Orchestration/AgentOrchestration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public async ValueTask<OrchestrationResult<TOutput>> InvokeAsync(

logger.LogOrchestrationInvoke(this.OrchestrationLabel, topic);

Task task = runtime.SendMessageAsync(input, orchestrationType, cancellationToken).AsTask();
Task task = runtime.PublishMessageAsync(input, orchestrationType, cancellationToken).AsTask();

logger.LogOrchestrationYield(this.OrchestrationLabel, topic);

Expand Down Expand Up @@ -168,7 +168,7 @@ private async ValueTask<AgentType> RegisterAsync(IAgentRuntime runtime, Orchestr

// Register actor for orchestration entry-point
AgentType orchestrationEntry =
await runtime.RegisterAgentFactoryAsync(
await runtime.RegisterOrchestrationAgentAsync(
this.FormatAgentType(context.Topic, "Boot"),
(agentId, runtime) =>
{
Expand Down Expand Up @@ -210,8 +210,8 @@ public sealed class RegistrationContext(
public async ValueTask<AgentType> RegisterResultTypeAsync<TResult>(OrchestrationResultTransform<TResult> resultTransform)
{
// Register actor for final result
return
await runtime.RegisterAgentFactoryAsync(
AgentType registeredType =
await runtime.RegisterOrchestrationAgentAsync(
agentType,
(agentId, runtime) =>
{
Expand All @@ -229,6 +229,8 @@ await runtime.RegisterAgentFactoryAsync(
return ValueTask.FromResult<IHostableAgent>(actor);
#endif
}).ConfigureAwait(false);

return registeredType;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ public async ValueTask HandleAsync(ConcurrentMessages.Request item, MessageConte

this.Logger.LogConcurrentAgentResult(this.Id, response.Content);

await this.SendMessageAsync(response.AsResultMessage(), this._handoffActor, messageContext.CancellationToken).ConfigureAwait(false);
await this.PublishMessageAsync(response.AsResultMessage(), this._handoffActor, messageContext.CancellationToken).ConfigureAwait(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ protected override ValueTask StartAsync(IAgentRuntime runtime, TopicId topic, IE

// Register result actor
AgentType resultType = this.FormatAgentType(context.Topic, "Results");
await runtime.RegisterAgentFactoryAsync(
await runtime.RegisterOrchestrationAgentAsync(
resultType,
(agentId, runtime) =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public async ValueTask HandleAsync(ConcurrentMessages.Result item, MessageContex

if (Interlocked.Increment(ref this._resultCount) == this._expectedCount)
{
await this.SendMessageAsync(this._results.ToArray(), this._orchestrationType, messageContext.CancellationToken).ConfigureAwait(false);
await this.PublishMessageAsync(this._results.ToArray(), this._orchestrationType, messageContext.CancellationToken).ConfigureAwait(false);
}
}
}
36 changes: 30 additions & 6 deletions dotnet/src/Agents/Orchestration/Extensions/RuntimeExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright (c) Microsoft. All rights reserved.

using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.SemanticKernel.Agents.Runtime;
Expand All @@ -15,13 +16,36 @@ public static class RuntimeExtensions
/// <summary>
/// Sends a message to the specified agent.
/// </summary>
public static async ValueTask SendMessageAsync(this IAgentRuntime runtime, object message, AgentType agentType, CancellationToken cancellationToken = default)
public static async ValueTask PublishMessageAsync(this IAgentRuntime runtime, object message, AgentType agentType, CancellationToken cancellationToken = default)
{
AgentId? agentId = await runtime.GetAgentAsync(agentType, lazy: false).ConfigureAwait(false);
if (agentId.HasValue)
{
await runtime.SendMessageAsync(message, agentId.Value, sender: null, messageId: null, cancellationToken).ConfigureAwait(false);
}
await runtime.PublishMessageAsync(message, new TopicId(agentType), sender: null, messageId: null, cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Registers an agent factory for the specified agent type and associates it with the runtime.
/// </summary>
/// <param name="runtime">The runtime targeted for registration.</param>
/// <param name="agentType">The type of agent to register.</param>
/// <param name="factoryFunc">The factory function for creating the agent.</param>
/// <returns>The registered agent type.</returns>
public static async ValueTask<AgentType> RegisterOrchestrationAgentAsync(this IAgentRuntime runtime, AgentType agentType, Func<AgentId, IAgentRuntime, ValueTask<IHostableAgent>> factoryFunc)
{
AgentType registeredType = await runtime.RegisterAgentFactoryAsync(agentType, factoryFunc).ConfigureAwait(false);

// Subscribe agent to its own unique topic
await runtime.SubscribeAsync(registeredType).ConfigureAwait(false);

return registeredType;
}

/// <summary>
/// Subscribes the specified agent type to its own dedicated topic.
/// </summary>
/// <param name="runtime">The runtime for managing the subscription.</param>
/// <param name="agentType">The agent type to subscribe.</param>
public static async Task SubscribeAsync(this IAgentRuntime runtime, string agentType)
{
await runtime.AddSubscriptionAsync(new TypeSubscription(agentType, agentType)).ConfigureAwait(false);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,13 @@ private async ValueTask ManageAsync(MessageContext messageContext)
{
GroupChatManagerResult<string> filterResult = await this._manager.FilterResults(this._chat, messageContext.CancellationToken).ConfigureAwait(false);
this.Logger.LogChatManagerResult(this.Id, filterResult.Value, filterResult.Reason);
await this.SendMessageAsync(filterResult.Value.AsResultMessage(), this._orchestrationType, messageContext.CancellationToken).ConfigureAwait(false);
await this.PublishMessageAsync(filterResult.Value.AsResultMessage(), this._orchestrationType, messageContext.CancellationToken).ConfigureAwait(false);
return;
}

GroupChatManagerResult<string> selectionResult = await this._manager.SelectNextAgent(this._chat, this._team, messageContext.CancellationToken).ConfigureAwait(false);
AgentType selectionType = this._team[selectionResult.Value].Type;
this.Logger.LogChatManagerSelect(this.Id, selectionType);
await this.SendMessageAsync(new GroupChatMessages.Speak(), selectionType, messageContext.CancellationToken).ConfigureAwait(false);
await this.PublishMessageAsync(new GroupChatMessages.Speak(), selectionType, messageContext.CancellationToken).ConfigureAwait(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ protected override ValueTask StartAsync(IAgentRuntime runtime, TopicId topic, IE
{
throw new ArgumentException("Entry agent is not defined.", nameof(entryAgent));
}
return runtime.SendMessageAsync(input.AsInputTaskMessage(), entryAgent.Value);
return runtime.PublishMessageAsync(input.AsInputTaskMessage(), entryAgent.Value);
}

/// <inheritdoc />
Expand All @@ -64,7 +64,7 @@ protected override ValueTask StartAsync(IAgentRuntime runtime, TopicId topic, IE
}

AgentType managerType =
await runtime.RegisterAgentFactoryAsync(
await runtime.RegisterOrchestrationAgentAsync(
this.FormatAgentType(context.Topic, "Manager"),
(agentId, runtime) =>
{
Expand All @@ -82,7 +82,7 @@ await runtime.RegisterAgentFactoryAsync(
return managerType;

ValueTask<AgentType> RegisterAgentAsync(Agent agent, int agentCount) =>
runtime.RegisterAgentFactoryAsync(
runtime.RegisterOrchestrationAgentAsync(
this.FormatAgentType(context.Topic, $"Agent_{agentCount}"),
(agentId, runtime) =>
{
Expand Down
4 changes: 2 additions & 2 deletions dotnet/src/Agents/Orchestration/Handoff/HandoffActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public async ValueTask HandleAsync(HandoffMessages.Request item, MessageContext
if (this._handoffAgent != null)
{
AgentType handoffType = this._handoffs[this._handoffAgent].AgentType;
await this.SendMessageAsync(new HandoffMessages.Request(), handoffType, messageContext.CancellationToken).ConfigureAwait(false);
await this.PublishMessageAsync(new HandoffMessages.Request(), handoffType, messageContext.CancellationToken).ConfigureAwait(false);

this._handoffAgent = null;
break;
Expand Down Expand Up @@ -183,6 +183,6 @@ private async ValueTask EndAsync(string summary, CancellationToken cancellationT
{
this.Logger.LogHandoffSummary(this.Id, summary);
this._taskSummary = summary;
await this.SendMessageAsync(new HandoffMessages.Result { Message = new ChatMessageContent(AuthorRole.Assistant, summary) }, this._resultHandoff, cancellationToken).ConfigureAwait(false);
await this.PublishMessageAsync(new HandoffMessages.Result { Message = new ChatMessageContent(AuthorRole.Assistant, summary) }, this._resultHandoff, cancellationToken).ConfigureAwait(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ protected override async ValueTask StartAsync(IAgentRuntime runtime, TopicId top
throw new ArgumentException("Entry agent is not defined.", nameof(entryAgent));
}
await runtime.PublishMessageAsync(input.AsInputTaskMessage(), topic).ConfigureAwait(false);
await runtime.SendMessageAsync(new HandoffMessages.Request(), entryAgent.Value).ConfigureAwait(false);
await runtime.PublishMessageAsync(new HandoffMessages.Request(), entryAgent.Value).ConfigureAwait(false);
}

/// <inheritdoc />
Expand All @@ -71,7 +71,7 @@ protected override async ValueTask StartAsync(IAgentRuntime runtime, TopicId top
HandoffLookup map = [];
handoffMap[agent.Name ?? agent.Id] = map;
agentType =
await runtime.RegisterAgentFactoryAsync(
await runtime.RegisterOrchestrationAgentAsync(
this.GetAgentType(context.Topic, index),
(agentId, runtime) =>
{
Expand Down
11 changes: 2 additions & 9 deletions dotnet/src/Agents/Orchestration/OrchestrationActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,11 @@ protected OrchestrationActor(AgentId id, IAgentRuntime runtime, OrchestrationCon
/// <param name="agentType">The recipient agent's type.</param>
/// <param name="cancellationToken">A token used to cancel the operation if needed.</param>
/// <returns>The agent identifier, if it exists.</returns>
protected async ValueTask<AgentId?> SendMessageAsync(
protected async ValueTask PublishMessageAsync(
object message,
AgentType agentType,
CancellationToken cancellationToken = default)
{
AgentId? agentId = await this.GetAgentAsync(agentType, cancellationToken).ConfigureAwait(false);

if (agentId.HasValue)
{
await this.SendMessageAsync(message, agentId.Value, messageId: null, cancellationToken).ConfigureAwait(false);
}

return agentId;
await base.PublishMessageAsync(message, new TopicId(agentType), messageId: null, cancellationToken).ConfigureAwait(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,6 @@ private async ValueTask InvokeAgentAsync(IList<ChatMessageContent> input, Messag

this.Logger.LogSequentialAgentResult(this.Id, response.Content);

await this.SendMessageAsync(response.AsResponseMessage(), this._nextAgent, messageContext.CancellationToken).ConfigureAwait(false);
await this.PublishMessageAsync(response.AsResponseMessage(), this._nextAgent, messageContext.CancellationToken).ConfigureAwait(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ protected override async ValueTask StartAsync(IAgentRuntime runtime, TopicId top
{
throw new ArgumentException("Entry agent is not defined.", nameof(entryAgent));
}
await runtime.SendMessageAsync(input.AsRequestMessage(), entryAgent.Value).ConfigureAwait(false);
await runtime.PublishMessageAsync(input.AsRequestMessage(), entryAgent.Value).ConfigureAwait(false);
}

/// <inheritdoc />
Expand All @@ -52,11 +52,12 @@ protected override async ValueTask StartAsync(IAgentRuntime runtime, TopicId top
return nextAgent;

ValueTask<AgentType> RegisterAgentAsync(Agent agent, int index, AgentType nextAgent) =>
runtime.RegisterAgentFactoryAsync(
runtime.RegisterOrchestrationAgentAsync(
this.GetAgentType(context.Topic, index),
(agentId, runtime) =>
{
SequentialActor actor = new(agentId, runtime, context, agent, nextAgent, context.LoggerFactory.CreateLogger<SequentialActor>());

#if !NETCOREAPP
return actor.AsValueTask<IHostableAgent>();
#else
Expand Down
Loading