From 0bbb9f8d346943e211fe75ae4d0368f2ef0451e4 Mon Sep 17 00:00:00 2001 From: Chris Rickman Date: Wed, 11 Jun 2025 15:09:52 -0700 Subject: [PATCH] Updated --- .../Agents/Magentic/MagenticManagerActor.cs | 8 ++--- .../Agents/Magentic/MagenticOrchestration.cs | 6 ++-- .../Orchestration/AgentOrchestration.cs | 10 +++--- .../Concurrent/ConcurrentActor.cs | 2 +- .../Concurrent/ConcurrentOrchestration.cs | 2 +- .../Concurrent/ConcurrentResultActor.cs | 2 +- .../Extensions/RuntimeExtensions.cs | 36 +++++++++++++++---- .../GroupChat/GroupChatManagerActor.cs | 4 +-- .../GroupChat/GroupChatOrchestration.cs | 6 ++-- .../Orchestration/Handoff/HandoffActor.cs | 4 +-- .../Handoff/HandoffOrchestration.cs | 4 +-- .../Orchestration/OrchestrationActor.cs | 11 ++---- .../Sequential/SequentialActor.cs | 2 +- .../Sequential/SequentialOrchestration.cs | 5 +-- 14 files changed, 61 insertions(+), 41 deletions(-) diff --git a/dotnet/src/Agents/Magentic/MagenticManagerActor.cs b/dotnet/src/Agents/Magentic/MagenticManagerActor.cs index 5558c04f7ae1..bb6cf93b1b20 100644 --- a/dotnet/src/Agents/Magentic/MagenticManagerActor.cs +++ b/dotnet/src/Agents/Magentic/MagenticManagerActor.cs @@ -101,7 +101,7 @@ private async ValueTask ManageAsync(CancellationToken cancellationToken) if (status.IsTaskComplete) { ChatMessageContent finalAnswer = await this._manager.PrepareFinalAnswerAsync(context, cancellationToken).ConfigureAwait(false); - await this.SendMessageAsync(finalAnswer.AsResultMessage(), this._orchestrationType, cancellationToken).ConfigureAwait(false); + await this.PublishMessageAsync(finalAnswer.AsResultMessage(), this._orchestrationType, cancellationToken).ConfigureAwait(false); break; } @@ -142,14 +142,14 @@ private async ValueTask ManageAsync(CancellationToken cancellationToken) if (this._invocationCount >= this._manager.MaximumInvocationCount) { - await this.SendMessageAsync("Maximum number of invocations reached.".AsResultMessage(), this._orchestrationType, cancellationToken).ConfigureAwait(false); + await this.PublishMessageAsync("Maximum number of invocations reached.".AsResultMessage(), this._orchestrationType, cancellationToken).ConfigureAwait(false); break; } ChatMessageContent instruction = new(AuthorRole.Assistant, agentInstruction); this._chat.Add(instruction); await this.PublishMessageAsync(instruction.AsGroupMessage(), this.Context.Topic, messageId: null, cancellationToken).ConfigureAwait(false); - await this.SendMessageAsync(new MagenticMessages.Speak(), agent.Type, cancellationToken).ConfigureAwait(false); + await this.PublishMessageAsync(new MagenticMessages.Speak(), agent.Type, cancellationToken).ConfigureAwait(false); break; } @@ -158,7 +158,7 @@ private async ValueTask ManageAsync(CancellationToken cancellationToken) if (this._retryCount >= this._manager.MaximumResetCount) { this.Logger.LogMagenticManagerTaskFailed(this.Context.Topic); - await this.SendMessageAsync("I've experienced multiple failures and am unable to continue.".AsResultMessage(), this._orchestrationType, cancellationToken).ConfigureAwait(false); + await this.PublishMessageAsync("I've experienced multiple failures and am unable to continue.".AsResultMessage(), this._orchestrationType, cancellationToken).ConfigureAwait(false); break; } diff --git a/dotnet/src/Agents/Magentic/MagenticOrchestration.cs b/dotnet/src/Agents/Magentic/MagenticOrchestration.cs index 3b0f2814aac4..fd99b1620d47 100644 --- a/dotnet/src/Agents/Magentic/MagenticOrchestration.cs +++ b/dotnet/src/Agents/Magentic/MagenticOrchestration.cs @@ -40,7 +40,7 @@ protected override ValueTask StartAsync(IAgentRuntime runtime, TopicId topic, IE { throw new ArgumentException("Entry agent is not defined.", nameof(entryAgent)); } - return runtime.SendMessageAsync(input.AsInputTaskMessage(), entryAgent.Value); + return runtime.PublishMessageAsync(input.AsInputTaskMessage(), entryAgent.Value); } /// @@ -65,7 +65,7 @@ protected override ValueTask StartAsync(IAgentRuntime runtime, TopicId topic, IE } AgentType managerType = - await runtime.RegisterAgentFactoryAsync( + await runtime.RegisterOrchestrationAgentAsync( this.FormatAgentType(context.Topic, "Manager"), (agentId, runtime) => { @@ -83,7 +83,7 @@ await runtime.RegisterAgentFactoryAsync( return managerType; ValueTask RegisterAgentAsync(Agent agent, int agentCount) => - runtime.RegisterAgentFactoryAsync( + runtime.RegisterOrchestrationAgentAsync( this.FormatAgentType(context.Topic, $"Agent_{agentCount}"), (agentId, runtime) => { diff --git a/dotnet/src/Agents/Orchestration/AgentOrchestration.cs b/dotnet/src/Agents/Orchestration/AgentOrchestration.cs index 994b611a59ac..c021e03d205b 100644 --- a/dotnet/src/Agents/Orchestration/AgentOrchestration.cs +++ b/dotnet/src/Agents/Orchestration/AgentOrchestration.cs @@ -114,7 +114,7 @@ public async ValueTask> InvokeAsync( logger.LogOrchestrationInvoke(this.OrchestrationLabel, topic); - Task task = runtime.SendMessageAsync(input, orchestrationType, cancellationToken).AsTask(); + Task task = runtime.PublishMessageAsync(input, orchestrationType, cancellationToken).AsTask(); logger.LogOrchestrationYield(this.OrchestrationLabel, topic); @@ -168,7 +168,7 @@ private async ValueTask RegisterAsync(IAgentRuntime runtime, Orchestr // Register actor for orchestration entry-point AgentType orchestrationEntry = - await runtime.RegisterAgentFactoryAsync( + await runtime.RegisterOrchestrationAgentAsync( this.FormatAgentType(context.Topic, "Boot"), (agentId, runtime) => { @@ -210,8 +210,8 @@ public sealed class RegistrationContext( public async ValueTask RegisterResultTypeAsync(OrchestrationResultTransform resultTransform) { // Register actor for final result - return - await runtime.RegisterAgentFactoryAsync( + AgentType registeredType = + await runtime.RegisterOrchestrationAgentAsync( agentType, (agentId, runtime) => { @@ -229,6 +229,8 @@ await runtime.RegisterAgentFactoryAsync( return ValueTask.FromResult(actor); #endif }).ConfigureAwait(false); + + return registeredType; } } } diff --git a/dotnet/src/Agents/Orchestration/Concurrent/ConcurrentActor.cs b/dotnet/src/Agents/Orchestration/Concurrent/ConcurrentActor.cs index fc57ecf236e4..2b5cfa6da5cb 100644 --- a/dotnet/src/Agents/Orchestration/Concurrent/ConcurrentActor.cs +++ b/dotnet/src/Agents/Orchestration/Concurrent/ConcurrentActor.cs @@ -38,6 +38,6 @@ public async ValueTask HandleAsync(ConcurrentMessages.Request item, MessageConte this.Logger.LogConcurrentAgentResult(this.Id, response.Content); - await this.SendMessageAsync(response.AsResultMessage(), this._handoffActor, messageContext.CancellationToken).ConfigureAwait(false); + await this.PublishMessageAsync(response.AsResultMessage(), this._handoffActor, messageContext.CancellationToken).ConfigureAwait(false); } } diff --git a/dotnet/src/Agents/Orchestration/Concurrent/ConcurrentOrchestration.cs b/dotnet/src/Agents/Orchestration/Concurrent/ConcurrentOrchestration.cs index fead042e1a14..87fdba7ce241 100644 --- a/dotnet/src/Agents/Orchestration/Concurrent/ConcurrentOrchestration.cs +++ b/dotnet/src/Agents/Orchestration/Concurrent/ConcurrentOrchestration.cs @@ -40,7 +40,7 @@ protected override ValueTask StartAsync(IAgentRuntime runtime, TopicId topic, IE // Register result actor AgentType resultType = this.FormatAgentType(context.Topic, "Results"); - await runtime.RegisterAgentFactoryAsync( + await runtime.RegisterOrchestrationAgentAsync( resultType, (agentId, runtime) => { diff --git a/dotnet/src/Agents/Orchestration/Concurrent/ConcurrentResultActor.cs b/dotnet/src/Agents/Orchestration/Concurrent/ConcurrentResultActor.cs index eb4e1f2994fe..c8772f7b7d34 100644 --- a/dotnet/src/Agents/Orchestration/Concurrent/ConcurrentResultActor.cs +++ b/dotnet/src/Agents/Orchestration/Concurrent/ConcurrentResultActor.cs @@ -53,7 +53,7 @@ public async ValueTask HandleAsync(ConcurrentMessages.Result item, MessageContex if (Interlocked.Increment(ref this._resultCount) == this._expectedCount) { - await this.SendMessageAsync(this._results.ToArray(), this._orchestrationType, messageContext.CancellationToken).ConfigureAwait(false); + await this.PublishMessageAsync(this._results.ToArray(), this._orchestrationType, messageContext.CancellationToken).ConfigureAwait(false); } } } diff --git a/dotnet/src/Agents/Orchestration/Extensions/RuntimeExtensions.cs b/dotnet/src/Agents/Orchestration/Extensions/RuntimeExtensions.cs index 0bae0bcf2934..1f41e906d09e 100644 --- a/dotnet/src/Agents/Orchestration/Extensions/RuntimeExtensions.cs +++ b/dotnet/src/Agents/Orchestration/Extensions/RuntimeExtensions.cs @@ -1,5 +1,6 @@ // Copyright (c) Microsoft. All rights reserved. +using System; using System.Threading; using System.Threading.Tasks; using Microsoft.SemanticKernel.Agents.Runtime; @@ -15,13 +16,36 @@ public static class RuntimeExtensions /// /// Sends a message to the specified agent. /// - public static async ValueTask SendMessageAsync(this IAgentRuntime runtime, object message, AgentType agentType, CancellationToken cancellationToken = default) + public static async ValueTask PublishMessageAsync(this IAgentRuntime runtime, object message, AgentType agentType, CancellationToken cancellationToken = default) { - AgentId? agentId = await runtime.GetAgentAsync(agentType, lazy: false).ConfigureAwait(false); - if (agentId.HasValue) - { - await runtime.SendMessageAsync(message, agentId.Value, sender: null, messageId: null, cancellationToken).ConfigureAwait(false); - } + await runtime.PublishMessageAsync(message, new TopicId(agentType), sender: null, messageId: null, cancellationToken).ConfigureAwait(false); + } + + /// + /// Registers an agent factory for the specified agent type and associates it with the runtime. + /// + /// The runtime targeted for registration. + /// The type of agent to register. + /// The factory function for creating the agent. + /// The registered agent type. + public static async ValueTask RegisterOrchestrationAgentAsync(this IAgentRuntime runtime, AgentType agentType, Func> factoryFunc) + { + AgentType registeredType = await runtime.RegisterAgentFactoryAsync(agentType, factoryFunc).ConfigureAwait(false); + + // Subscribe agent to its own unique topic + await runtime.SubscribeAsync(registeredType).ConfigureAwait(false); + + return registeredType; + } + + /// + /// Subscribes the specified agent type to its own dedicated topic. + /// + /// The runtime for managing the subscription. + /// The agent type to subscribe. + public static async Task SubscribeAsync(this IAgentRuntime runtime, string agentType) + { + await runtime.AddSubscriptionAsync(new TypeSubscription(agentType, agentType)).ConfigureAwait(false); } /// diff --git a/dotnet/src/Agents/Orchestration/GroupChat/GroupChatManagerActor.cs b/dotnet/src/Agents/Orchestration/GroupChat/GroupChatManagerActor.cs index cff379ded649..d62d3ce10e55 100644 --- a/dotnet/src/Agents/Orchestration/GroupChat/GroupChatManagerActor.cs +++ b/dotnet/src/Agents/Orchestration/GroupChat/GroupChatManagerActor.cs @@ -88,13 +88,13 @@ private async ValueTask ManageAsync(MessageContext messageContext) { GroupChatManagerResult filterResult = await this._manager.FilterResults(this._chat, messageContext.CancellationToken).ConfigureAwait(false); this.Logger.LogChatManagerResult(this.Id, filterResult.Value, filterResult.Reason); - await this.SendMessageAsync(filterResult.Value.AsResultMessage(), this._orchestrationType, messageContext.CancellationToken).ConfigureAwait(false); + await this.PublishMessageAsync(filterResult.Value.AsResultMessage(), this._orchestrationType, messageContext.CancellationToken).ConfigureAwait(false); return; } GroupChatManagerResult selectionResult = await this._manager.SelectNextAgent(this._chat, this._team, messageContext.CancellationToken).ConfigureAwait(false); AgentType selectionType = this._team[selectionResult.Value].Type; this.Logger.LogChatManagerSelect(this.Id, selectionType); - await this.SendMessageAsync(new GroupChatMessages.Speak(), selectionType, messageContext.CancellationToken).ConfigureAwait(false); + await this.PublishMessageAsync(new GroupChatMessages.Speak(), selectionType, messageContext.CancellationToken).ConfigureAwait(false); } } diff --git a/dotnet/src/Agents/Orchestration/GroupChat/GroupChatOrchestration.cs b/dotnet/src/Agents/Orchestration/GroupChat/GroupChatOrchestration.cs index d2ed007d62d1..0c083ab2ebaf 100644 --- a/dotnet/src/Agents/Orchestration/GroupChat/GroupChatOrchestration.cs +++ b/dotnet/src/Agents/Orchestration/GroupChat/GroupChatOrchestration.cs @@ -39,7 +39,7 @@ protected override ValueTask StartAsync(IAgentRuntime runtime, TopicId topic, IE { throw new ArgumentException("Entry agent is not defined.", nameof(entryAgent)); } - return runtime.SendMessageAsync(input.AsInputTaskMessage(), entryAgent.Value); + return runtime.PublishMessageAsync(input.AsInputTaskMessage(), entryAgent.Value); } /// @@ -64,7 +64,7 @@ protected override ValueTask StartAsync(IAgentRuntime runtime, TopicId topic, IE } AgentType managerType = - await runtime.RegisterAgentFactoryAsync( + await runtime.RegisterOrchestrationAgentAsync( this.FormatAgentType(context.Topic, "Manager"), (agentId, runtime) => { @@ -82,7 +82,7 @@ await runtime.RegisterAgentFactoryAsync( return managerType; ValueTask RegisterAgentAsync(Agent agent, int agentCount) => - runtime.RegisterAgentFactoryAsync( + runtime.RegisterOrchestrationAgentAsync( this.FormatAgentType(context.Topic, $"Agent_{agentCount}"), (agentId, runtime) => { diff --git a/dotnet/src/Agents/Orchestration/Handoff/HandoffActor.cs b/dotnet/src/Agents/Orchestration/Handoff/HandoffActor.cs index 73bba993c84d..5cc387b0398b 100644 --- a/dotnet/src/Agents/Orchestration/Handoff/HandoffActor.cs +++ b/dotnet/src/Agents/Orchestration/Handoff/HandoffActor.cs @@ -125,7 +125,7 @@ public async ValueTask HandleAsync(HandoffMessages.Request item, MessageContext if (this._handoffAgent != null) { AgentType handoffType = this._handoffs[this._handoffAgent].AgentType; - await this.SendMessageAsync(new HandoffMessages.Request(), handoffType, messageContext.CancellationToken).ConfigureAwait(false); + await this.PublishMessageAsync(new HandoffMessages.Request(), handoffType, messageContext.CancellationToken).ConfigureAwait(false); this._handoffAgent = null; break; @@ -183,6 +183,6 @@ private async ValueTask EndAsync(string summary, CancellationToken cancellationT { this.Logger.LogHandoffSummary(this.Id, summary); this._taskSummary = summary; - await this.SendMessageAsync(new HandoffMessages.Result { Message = new ChatMessageContent(AuthorRole.Assistant, summary) }, this._resultHandoff, cancellationToken).ConfigureAwait(false); + await this.PublishMessageAsync(new HandoffMessages.Result { Message = new ChatMessageContent(AuthorRole.Assistant, summary) }, this._resultHandoff, cancellationToken).ConfigureAwait(false); } } diff --git a/dotnet/src/Agents/Orchestration/Handoff/HandoffOrchestration.cs b/dotnet/src/Agents/Orchestration/Handoff/HandoffOrchestration.cs index 04d75cef719a..26cb306799f8 100644 --- a/dotnet/src/Agents/Orchestration/Handoff/HandoffOrchestration.cs +++ b/dotnet/src/Agents/Orchestration/Handoff/HandoffOrchestration.cs @@ -53,7 +53,7 @@ protected override async ValueTask StartAsync(IAgentRuntime runtime, TopicId top throw new ArgumentException("Entry agent is not defined.", nameof(entryAgent)); } await runtime.PublishMessageAsync(input.AsInputTaskMessage(), topic).ConfigureAwait(false); - await runtime.SendMessageAsync(new HandoffMessages.Request(), entryAgent.Value).ConfigureAwait(false); + await runtime.PublishMessageAsync(new HandoffMessages.Request(), entryAgent.Value).ConfigureAwait(false); } /// @@ -71,7 +71,7 @@ protected override async ValueTask StartAsync(IAgentRuntime runtime, TopicId top HandoffLookup map = []; handoffMap[agent.Name ?? agent.Id] = map; agentType = - await runtime.RegisterAgentFactoryAsync( + await runtime.RegisterOrchestrationAgentAsync( this.GetAgentType(context.Topic, index), (agentId, runtime) => { diff --git a/dotnet/src/Agents/Orchestration/OrchestrationActor.cs b/dotnet/src/Agents/Orchestration/OrchestrationActor.cs index 1e7866d2ca86..7b5e093ed451 100644 --- a/dotnet/src/Agents/Orchestration/OrchestrationActor.cs +++ b/dotnet/src/Agents/Orchestration/OrchestrationActor.cs @@ -34,18 +34,11 @@ protected OrchestrationActor(AgentId id, IAgentRuntime runtime, OrchestrationCon /// The recipient agent's type. /// A token used to cancel the operation if needed. /// The agent identifier, if it exists. - protected async ValueTask SendMessageAsync( + protected async ValueTask PublishMessageAsync( object message, AgentType agentType, CancellationToken cancellationToken = default) { - AgentId? agentId = await this.GetAgentAsync(agentType, cancellationToken).ConfigureAwait(false); - - if (agentId.HasValue) - { - await this.SendMessageAsync(message, agentId.Value, messageId: null, cancellationToken).ConfigureAwait(false); - } - - return agentId; + await base.PublishMessageAsync(message, new TopicId(agentType), messageId: null, cancellationToken).ConfigureAwait(false); } } diff --git a/dotnet/src/Agents/Orchestration/Sequential/SequentialActor.cs b/dotnet/src/Agents/Orchestration/Sequential/SequentialActor.cs index 8af67d287176..c563104cc298 100644 --- a/dotnet/src/Agents/Orchestration/Sequential/SequentialActor.cs +++ b/dotnet/src/Agents/Orchestration/Sequential/SequentialActor.cs @@ -56,6 +56,6 @@ private async ValueTask InvokeAgentAsync(IList input, Messag this.Logger.LogSequentialAgentResult(this.Id, response.Content); - await this.SendMessageAsync(response.AsResponseMessage(), this._nextAgent, messageContext.CancellationToken).ConfigureAwait(false); + await this.PublishMessageAsync(response.AsResponseMessage(), this._nextAgent, messageContext.CancellationToken).ConfigureAwait(false); } } diff --git a/dotnet/src/Agents/Orchestration/Sequential/SequentialOrchestration.cs b/dotnet/src/Agents/Orchestration/Sequential/SequentialOrchestration.cs index be13c1f87fe3..698929dd7172 100644 --- a/dotnet/src/Agents/Orchestration/Sequential/SequentialOrchestration.cs +++ b/dotnet/src/Agents/Orchestration/Sequential/SequentialOrchestration.cs @@ -31,7 +31,7 @@ protected override async ValueTask StartAsync(IAgentRuntime runtime, TopicId top { throw new ArgumentException("Entry agent is not defined.", nameof(entryAgent)); } - await runtime.SendMessageAsync(input.AsRequestMessage(), entryAgent.Value).ConfigureAwait(false); + await runtime.PublishMessageAsync(input.AsRequestMessage(), entryAgent.Value).ConfigureAwait(false); } /// @@ -52,11 +52,12 @@ protected override async ValueTask StartAsync(IAgentRuntime runtime, TopicId top return nextAgent; ValueTask RegisterAgentAsync(Agent agent, int index, AgentType nextAgent) => - runtime.RegisterAgentFactoryAsync( + runtime.RegisterOrchestrationAgentAsync( this.GetAgentType(context.Topic, index), (agentId, runtime) => { SequentialActor actor = new(agentId, runtime, context, agent, nextAgent, context.LoggerFactory.CreateLogger()); + #if !NETCOREAPP return actor.AsValueTask(); #else