diff --git a/dotnet/src/Agents/Magentic/MagenticManagerActor.cs b/dotnet/src/Agents/Magentic/MagenticManagerActor.cs
index 5558c04f7ae1..bb6cf93b1b20 100644
--- a/dotnet/src/Agents/Magentic/MagenticManagerActor.cs
+++ b/dotnet/src/Agents/Magentic/MagenticManagerActor.cs
@@ -101,7 +101,7 @@ private async ValueTask ManageAsync(CancellationToken cancellationToken)
if (status.IsTaskComplete)
{
ChatMessageContent finalAnswer = await this._manager.PrepareFinalAnswerAsync(context, cancellationToken).ConfigureAwait(false);
- await this.SendMessageAsync(finalAnswer.AsResultMessage(), this._orchestrationType, cancellationToken).ConfigureAwait(false);
+ await this.PublishMessageAsync(finalAnswer.AsResultMessage(), this._orchestrationType, cancellationToken).ConfigureAwait(false);
break;
}
@@ -142,14 +142,14 @@ private async ValueTask ManageAsync(CancellationToken cancellationToken)
if (this._invocationCount >= this._manager.MaximumInvocationCount)
{
- await this.SendMessageAsync("Maximum number of invocations reached.".AsResultMessage(), this._orchestrationType, cancellationToken).ConfigureAwait(false);
+ await this.PublishMessageAsync("Maximum number of invocations reached.".AsResultMessage(), this._orchestrationType, cancellationToken).ConfigureAwait(false);
break;
}
ChatMessageContent instruction = new(AuthorRole.Assistant, agentInstruction);
this._chat.Add(instruction);
await this.PublishMessageAsync(instruction.AsGroupMessage(), this.Context.Topic, messageId: null, cancellationToken).ConfigureAwait(false);
- await this.SendMessageAsync(new MagenticMessages.Speak(), agent.Type, cancellationToken).ConfigureAwait(false);
+ await this.PublishMessageAsync(new MagenticMessages.Speak(), agent.Type, cancellationToken).ConfigureAwait(false);
break;
}
@@ -158,7 +158,7 @@ private async ValueTask ManageAsync(CancellationToken cancellationToken)
if (this._retryCount >= this._manager.MaximumResetCount)
{
this.Logger.LogMagenticManagerTaskFailed(this.Context.Topic);
- await this.SendMessageAsync("I've experienced multiple failures and am unable to continue.".AsResultMessage(), this._orchestrationType, cancellationToken).ConfigureAwait(false);
+ await this.PublishMessageAsync("I've experienced multiple failures and am unable to continue.".AsResultMessage(), this._orchestrationType, cancellationToken).ConfigureAwait(false);
break;
}
diff --git a/dotnet/src/Agents/Magentic/MagenticOrchestration.cs b/dotnet/src/Agents/Magentic/MagenticOrchestration.cs
index 3b0f2814aac4..fd99b1620d47 100644
--- a/dotnet/src/Agents/Magentic/MagenticOrchestration.cs
+++ b/dotnet/src/Agents/Magentic/MagenticOrchestration.cs
@@ -40,7 +40,7 @@ protected override ValueTask StartAsync(IAgentRuntime runtime, TopicId topic, IE
{
throw new ArgumentException("Entry agent is not defined.", nameof(entryAgent));
}
- return runtime.SendMessageAsync(input.AsInputTaskMessage(), entryAgent.Value);
+ return runtime.PublishMessageAsync(input.AsInputTaskMessage(), entryAgent.Value);
}
///
@@ -65,7 +65,7 @@ protected override ValueTask StartAsync(IAgentRuntime runtime, TopicId topic, IE
}
AgentType managerType =
- await runtime.RegisterAgentFactoryAsync(
+ await runtime.RegisterOrchestrationAgentAsync(
this.FormatAgentType(context.Topic, "Manager"),
(agentId, runtime) =>
{
@@ -83,7 +83,7 @@ await runtime.RegisterAgentFactoryAsync(
return managerType;
ValueTask RegisterAgentAsync(Agent agent, int agentCount) =>
- runtime.RegisterAgentFactoryAsync(
+ runtime.RegisterOrchestrationAgentAsync(
this.FormatAgentType(context.Topic, $"Agent_{agentCount}"),
(agentId, runtime) =>
{
diff --git a/dotnet/src/Agents/Orchestration/AgentOrchestration.cs b/dotnet/src/Agents/Orchestration/AgentOrchestration.cs
index 994b611a59ac..c021e03d205b 100644
--- a/dotnet/src/Agents/Orchestration/AgentOrchestration.cs
+++ b/dotnet/src/Agents/Orchestration/AgentOrchestration.cs
@@ -114,7 +114,7 @@ public async ValueTask> InvokeAsync(
logger.LogOrchestrationInvoke(this.OrchestrationLabel, topic);
- Task task = runtime.SendMessageAsync(input, orchestrationType, cancellationToken).AsTask();
+ Task task = runtime.PublishMessageAsync(input, orchestrationType, cancellationToken).AsTask();
logger.LogOrchestrationYield(this.OrchestrationLabel, topic);
@@ -168,7 +168,7 @@ private async ValueTask RegisterAsync(IAgentRuntime runtime, Orchestr
// Register actor for orchestration entry-point
AgentType orchestrationEntry =
- await runtime.RegisterAgentFactoryAsync(
+ await runtime.RegisterOrchestrationAgentAsync(
this.FormatAgentType(context.Topic, "Boot"),
(agentId, runtime) =>
{
@@ -210,8 +210,8 @@ public sealed class RegistrationContext(
public async ValueTask RegisterResultTypeAsync(OrchestrationResultTransform resultTransform)
{
// Register actor for final result
- return
- await runtime.RegisterAgentFactoryAsync(
+ AgentType registeredType =
+ await runtime.RegisterOrchestrationAgentAsync(
agentType,
(agentId, runtime) =>
{
@@ -229,6 +229,8 @@ await runtime.RegisterAgentFactoryAsync(
return ValueTask.FromResult(actor);
#endif
}).ConfigureAwait(false);
+
+ return registeredType;
}
}
}
diff --git a/dotnet/src/Agents/Orchestration/Concurrent/ConcurrentActor.cs b/dotnet/src/Agents/Orchestration/Concurrent/ConcurrentActor.cs
index fc57ecf236e4..2b5cfa6da5cb 100644
--- a/dotnet/src/Agents/Orchestration/Concurrent/ConcurrentActor.cs
+++ b/dotnet/src/Agents/Orchestration/Concurrent/ConcurrentActor.cs
@@ -38,6 +38,6 @@ public async ValueTask HandleAsync(ConcurrentMessages.Request item, MessageConte
this.Logger.LogConcurrentAgentResult(this.Id, response.Content);
- await this.SendMessageAsync(response.AsResultMessage(), this._handoffActor, messageContext.CancellationToken).ConfigureAwait(false);
+ await this.PublishMessageAsync(response.AsResultMessage(), this._handoffActor, messageContext.CancellationToken).ConfigureAwait(false);
}
}
diff --git a/dotnet/src/Agents/Orchestration/Concurrent/ConcurrentOrchestration.cs b/dotnet/src/Agents/Orchestration/Concurrent/ConcurrentOrchestration.cs
index fead042e1a14..87fdba7ce241 100644
--- a/dotnet/src/Agents/Orchestration/Concurrent/ConcurrentOrchestration.cs
+++ b/dotnet/src/Agents/Orchestration/Concurrent/ConcurrentOrchestration.cs
@@ -40,7 +40,7 @@ protected override ValueTask StartAsync(IAgentRuntime runtime, TopicId topic, IE
// Register result actor
AgentType resultType = this.FormatAgentType(context.Topic, "Results");
- await runtime.RegisterAgentFactoryAsync(
+ await runtime.RegisterOrchestrationAgentAsync(
resultType,
(agentId, runtime) =>
{
diff --git a/dotnet/src/Agents/Orchestration/Concurrent/ConcurrentResultActor.cs b/dotnet/src/Agents/Orchestration/Concurrent/ConcurrentResultActor.cs
index eb4e1f2994fe..c8772f7b7d34 100644
--- a/dotnet/src/Agents/Orchestration/Concurrent/ConcurrentResultActor.cs
+++ b/dotnet/src/Agents/Orchestration/Concurrent/ConcurrentResultActor.cs
@@ -53,7 +53,7 @@ public async ValueTask HandleAsync(ConcurrentMessages.Result item, MessageContex
if (Interlocked.Increment(ref this._resultCount) == this._expectedCount)
{
- await this.SendMessageAsync(this._results.ToArray(), this._orchestrationType, messageContext.CancellationToken).ConfigureAwait(false);
+ await this.PublishMessageAsync(this._results.ToArray(), this._orchestrationType, messageContext.CancellationToken).ConfigureAwait(false);
}
}
}
diff --git a/dotnet/src/Agents/Orchestration/Extensions/RuntimeExtensions.cs b/dotnet/src/Agents/Orchestration/Extensions/RuntimeExtensions.cs
index 0bae0bcf2934..1f41e906d09e 100644
--- a/dotnet/src/Agents/Orchestration/Extensions/RuntimeExtensions.cs
+++ b/dotnet/src/Agents/Orchestration/Extensions/RuntimeExtensions.cs
@@ -1,5 +1,6 @@
// Copyright (c) Microsoft. All rights reserved.
+using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.SemanticKernel.Agents.Runtime;
@@ -15,13 +16,36 @@ public static class RuntimeExtensions
///
/// Sends a message to the specified agent.
///
- public static async ValueTask SendMessageAsync(this IAgentRuntime runtime, object message, AgentType agentType, CancellationToken cancellationToken = default)
+ public static async ValueTask PublishMessageAsync(this IAgentRuntime runtime, object message, AgentType agentType, CancellationToken cancellationToken = default)
{
- AgentId? agentId = await runtime.GetAgentAsync(agentType, lazy: false).ConfigureAwait(false);
- if (agentId.HasValue)
- {
- await runtime.SendMessageAsync(message, agentId.Value, sender: null, messageId: null, cancellationToken).ConfigureAwait(false);
- }
+ await runtime.PublishMessageAsync(message, new TopicId(agentType), sender: null, messageId: null, cancellationToken).ConfigureAwait(false);
+ }
+
+ ///
+ /// Registers an agent factory for the specified agent type and associates it with the runtime.
+ ///
+ /// The runtime targeted for registration.
+ /// The type of agent to register.
+ /// The factory function for creating the agent.
+ /// The registered agent type.
+ public static async ValueTask RegisterOrchestrationAgentAsync(this IAgentRuntime runtime, AgentType agentType, Func> factoryFunc)
+ {
+ AgentType registeredType = await runtime.RegisterAgentFactoryAsync(agentType, factoryFunc).ConfigureAwait(false);
+
+ // Subscribe agent to its own unique topic
+ await runtime.SubscribeAsync(registeredType).ConfigureAwait(false);
+
+ return registeredType;
+ }
+
+ ///
+ /// Subscribes the specified agent type to its own dedicated topic.
+ ///
+ /// The runtime for managing the subscription.
+ /// The agent type to subscribe.
+ public static async Task SubscribeAsync(this IAgentRuntime runtime, string agentType)
+ {
+ await runtime.AddSubscriptionAsync(new TypeSubscription(agentType, agentType)).ConfigureAwait(false);
}
///
diff --git a/dotnet/src/Agents/Orchestration/GroupChat/GroupChatManagerActor.cs b/dotnet/src/Agents/Orchestration/GroupChat/GroupChatManagerActor.cs
index cff379ded649..d62d3ce10e55 100644
--- a/dotnet/src/Agents/Orchestration/GroupChat/GroupChatManagerActor.cs
+++ b/dotnet/src/Agents/Orchestration/GroupChat/GroupChatManagerActor.cs
@@ -88,13 +88,13 @@ private async ValueTask ManageAsync(MessageContext messageContext)
{
GroupChatManagerResult filterResult = await this._manager.FilterResults(this._chat, messageContext.CancellationToken).ConfigureAwait(false);
this.Logger.LogChatManagerResult(this.Id, filterResult.Value, filterResult.Reason);
- await this.SendMessageAsync(filterResult.Value.AsResultMessage(), this._orchestrationType, messageContext.CancellationToken).ConfigureAwait(false);
+ await this.PublishMessageAsync(filterResult.Value.AsResultMessage(), this._orchestrationType, messageContext.CancellationToken).ConfigureAwait(false);
return;
}
GroupChatManagerResult selectionResult = await this._manager.SelectNextAgent(this._chat, this._team, messageContext.CancellationToken).ConfigureAwait(false);
AgentType selectionType = this._team[selectionResult.Value].Type;
this.Logger.LogChatManagerSelect(this.Id, selectionType);
- await this.SendMessageAsync(new GroupChatMessages.Speak(), selectionType, messageContext.CancellationToken).ConfigureAwait(false);
+ await this.PublishMessageAsync(new GroupChatMessages.Speak(), selectionType, messageContext.CancellationToken).ConfigureAwait(false);
}
}
diff --git a/dotnet/src/Agents/Orchestration/GroupChat/GroupChatOrchestration.cs b/dotnet/src/Agents/Orchestration/GroupChat/GroupChatOrchestration.cs
index d2ed007d62d1..0c083ab2ebaf 100644
--- a/dotnet/src/Agents/Orchestration/GroupChat/GroupChatOrchestration.cs
+++ b/dotnet/src/Agents/Orchestration/GroupChat/GroupChatOrchestration.cs
@@ -39,7 +39,7 @@ protected override ValueTask StartAsync(IAgentRuntime runtime, TopicId topic, IE
{
throw new ArgumentException("Entry agent is not defined.", nameof(entryAgent));
}
- return runtime.SendMessageAsync(input.AsInputTaskMessage(), entryAgent.Value);
+ return runtime.PublishMessageAsync(input.AsInputTaskMessage(), entryAgent.Value);
}
///
@@ -64,7 +64,7 @@ protected override ValueTask StartAsync(IAgentRuntime runtime, TopicId topic, IE
}
AgentType managerType =
- await runtime.RegisterAgentFactoryAsync(
+ await runtime.RegisterOrchestrationAgentAsync(
this.FormatAgentType(context.Topic, "Manager"),
(agentId, runtime) =>
{
@@ -82,7 +82,7 @@ await runtime.RegisterAgentFactoryAsync(
return managerType;
ValueTask RegisterAgentAsync(Agent agent, int agentCount) =>
- runtime.RegisterAgentFactoryAsync(
+ runtime.RegisterOrchestrationAgentAsync(
this.FormatAgentType(context.Topic, $"Agent_{agentCount}"),
(agentId, runtime) =>
{
diff --git a/dotnet/src/Agents/Orchestration/Handoff/HandoffActor.cs b/dotnet/src/Agents/Orchestration/Handoff/HandoffActor.cs
index 73bba993c84d..5cc387b0398b 100644
--- a/dotnet/src/Agents/Orchestration/Handoff/HandoffActor.cs
+++ b/dotnet/src/Agents/Orchestration/Handoff/HandoffActor.cs
@@ -125,7 +125,7 @@ public async ValueTask HandleAsync(HandoffMessages.Request item, MessageContext
if (this._handoffAgent != null)
{
AgentType handoffType = this._handoffs[this._handoffAgent].AgentType;
- await this.SendMessageAsync(new HandoffMessages.Request(), handoffType, messageContext.CancellationToken).ConfigureAwait(false);
+ await this.PublishMessageAsync(new HandoffMessages.Request(), handoffType, messageContext.CancellationToken).ConfigureAwait(false);
this._handoffAgent = null;
break;
@@ -183,6 +183,6 @@ private async ValueTask EndAsync(string summary, CancellationToken cancellationT
{
this.Logger.LogHandoffSummary(this.Id, summary);
this._taskSummary = summary;
- await this.SendMessageAsync(new HandoffMessages.Result { Message = new ChatMessageContent(AuthorRole.Assistant, summary) }, this._resultHandoff, cancellationToken).ConfigureAwait(false);
+ await this.PublishMessageAsync(new HandoffMessages.Result { Message = new ChatMessageContent(AuthorRole.Assistant, summary) }, this._resultHandoff, cancellationToken).ConfigureAwait(false);
}
}
diff --git a/dotnet/src/Agents/Orchestration/Handoff/HandoffOrchestration.cs b/dotnet/src/Agents/Orchestration/Handoff/HandoffOrchestration.cs
index 04d75cef719a..26cb306799f8 100644
--- a/dotnet/src/Agents/Orchestration/Handoff/HandoffOrchestration.cs
+++ b/dotnet/src/Agents/Orchestration/Handoff/HandoffOrchestration.cs
@@ -53,7 +53,7 @@ protected override async ValueTask StartAsync(IAgentRuntime runtime, TopicId top
throw new ArgumentException("Entry agent is not defined.", nameof(entryAgent));
}
await runtime.PublishMessageAsync(input.AsInputTaskMessage(), topic).ConfigureAwait(false);
- await runtime.SendMessageAsync(new HandoffMessages.Request(), entryAgent.Value).ConfigureAwait(false);
+ await runtime.PublishMessageAsync(new HandoffMessages.Request(), entryAgent.Value).ConfigureAwait(false);
}
///
@@ -71,7 +71,7 @@ protected override async ValueTask StartAsync(IAgentRuntime runtime, TopicId top
HandoffLookup map = [];
handoffMap[agent.Name ?? agent.Id] = map;
agentType =
- await runtime.RegisterAgentFactoryAsync(
+ await runtime.RegisterOrchestrationAgentAsync(
this.GetAgentType(context.Topic, index),
(agentId, runtime) =>
{
diff --git a/dotnet/src/Agents/Orchestration/OrchestrationActor.cs b/dotnet/src/Agents/Orchestration/OrchestrationActor.cs
index 1e7866d2ca86..7b5e093ed451 100644
--- a/dotnet/src/Agents/Orchestration/OrchestrationActor.cs
+++ b/dotnet/src/Agents/Orchestration/OrchestrationActor.cs
@@ -34,18 +34,11 @@ protected OrchestrationActor(AgentId id, IAgentRuntime runtime, OrchestrationCon
/// The recipient agent's type.
/// A token used to cancel the operation if needed.
/// The agent identifier, if it exists.
- protected async ValueTask SendMessageAsync(
+ protected async ValueTask PublishMessageAsync(
object message,
AgentType agentType,
CancellationToken cancellationToken = default)
{
- AgentId? agentId = await this.GetAgentAsync(agentType, cancellationToken).ConfigureAwait(false);
-
- if (agentId.HasValue)
- {
- await this.SendMessageAsync(message, agentId.Value, messageId: null, cancellationToken).ConfigureAwait(false);
- }
-
- return agentId;
+ await base.PublishMessageAsync(message, new TopicId(agentType), messageId: null, cancellationToken).ConfigureAwait(false);
}
}
diff --git a/dotnet/src/Agents/Orchestration/Sequential/SequentialActor.cs b/dotnet/src/Agents/Orchestration/Sequential/SequentialActor.cs
index 8af67d287176..c563104cc298 100644
--- a/dotnet/src/Agents/Orchestration/Sequential/SequentialActor.cs
+++ b/dotnet/src/Agents/Orchestration/Sequential/SequentialActor.cs
@@ -56,6 +56,6 @@ private async ValueTask InvokeAgentAsync(IList input, Messag
this.Logger.LogSequentialAgentResult(this.Id, response.Content);
- await this.SendMessageAsync(response.AsResponseMessage(), this._nextAgent, messageContext.CancellationToken).ConfigureAwait(false);
+ await this.PublishMessageAsync(response.AsResponseMessage(), this._nextAgent, messageContext.CancellationToken).ConfigureAwait(false);
}
}
diff --git a/dotnet/src/Agents/Orchestration/Sequential/SequentialOrchestration.cs b/dotnet/src/Agents/Orchestration/Sequential/SequentialOrchestration.cs
index be13c1f87fe3..698929dd7172 100644
--- a/dotnet/src/Agents/Orchestration/Sequential/SequentialOrchestration.cs
+++ b/dotnet/src/Agents/Orchestration/Sequential/SequentialOrchestration.cs
@@ -31,7 +31,7 @@ protected override async ValueTask StartAsync(IAgentRuntime runtime, TopicId top
{
throw new ArgumentException("Entry agent is not defined.", nameof(entryAgent));
}
- await runtime.SendMessageAsync(input.AsRequestMessage(), entryAgent.Value).ConfigureAwait(false);
+ await runtime.PublishMessageAsync(input.AsRequestMessage(), entryAgent.Value).ConfigureAwait(false);
}
///
@@ -52,11 +52,12 @@ protected override async ValueTask StartAsync(IAgentRuntime runtime, TopicId top
return nextAgent;
ValueTask RegisterAgentAsync(Agent agent, int index, AgentType nextAgent) =>
- runtime.RegisterAgentFactoryAsync(
+ runtime.RegisterOrchestrationAgentAsync(
this.GetAgentType(context.Topic, index),
(agentId, runtime) =>
{
SequentialActor actor = new(agentId, runtime, context, agent, nextAgent, context.LoggerFactory.CreateLogger());
+
#if !NETCOREAPP
return actor.AsValueTask();
#else