From 5dbac8d117ba5a549071301bf8561a7ff94047d6 Mon Sep 17 00:00:00 2001 From: Dominik Jeske Date: Fri, 16 May 2025 15:05:22 +0200 Subject: [PATCH] Allow many brokers for kafka --- .../Internals/KafkaTransport.cs | 4 +++- .../KafkaTransportExtensions.cs | 22 +++++++++---------- .../Configuration/IPublishToExpression.cs | 3 ++- .../Configuration/PublishingExpression.cs | 12 ++++++++-- .../Runtime/WolverineRuntime.HostService.cs | 1 + src/Wolverine/TransportCollection.cs | 8 +++---- 6 files changed, 31 insertions(+), 19 deletions(-) diff --git a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTransport.cs b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTransport.cs index cbd5bf588..194bf10a4 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTransport.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTransport.cs @@ -20,7 +20,9 @@ public class KafkaTransport : BrokerTransport public AdminClientConfig AdminClientConfig { get; } = new(); public Action ConfigureAdminClientBuilders { get; internal set; } = _ => {}; - public KafkaTransport() : base("kafka", "Kafka Topics") + public KafkaTransport() : this("Kafka Topics") { } + + public KafkaTransport(string brokerName) : base("kafka", brokerName) { Topics = new Cache(topicName => new KafkaTopic(this, topicName, EndpointRole.Application)); } diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExtensions.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExtensions.cs index 71d07aaff..234fe50da 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExtensions.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExtensions.cs @@ -14,11 +14,11 @@ public static class KafkaTransportExtensions /// /// /// - internal static KafkaTransport KafkaTransport(this WolverineOptions endpoints) + internal static KafkaTransport KafkaTransport(this WolverineOptions endpoints, BrokerName? name = null) { var transports = endpoints.As().Transports; - return transports.GetOrCreate(); + return transports.GetOrCreate(name); } /// @@ -27,11 +27,11 @@ internal static KafkaTransport KafkaTransport(this WolverineOptions endpoints) /// /// /// - public static KafkaTransportExpression UseKafka(this WolverineOptions options, string bootstrapServers) + public static KafkaTransportExpression UseKafka(this WolverineOptions options, string bootstrapServers, BrokerName? name = null) { // Automatic failure acks do not work with Kafka serialization failures options.EnableAutomaticFailureAcks = false; - var transport = options.KafkaTransport(); + var transport = options.KafkaTransport(name); transport.ConsumerConfig.BootstrapServers = bootstrapServers; transport.ProducerConfig.BootstrapServers = bootstrapServers; transport.AdminClientConfig.BootstrapServers = bootstrapServers; @@ -45,9 +45,9 @@ public static KafkaTransportExpression UseKafka(this WolverineOptions options, s /// /// /// - public static KafkaTransportExpression ConfigureKafka(this WolverineOptions options, string bootstrapServers) + public static KafkaTransportExpression ConfigureKafka(this WolverineOptions options, string bootstrapServers, BrokerName? name = null) { - var transport = options.KafkaTransport(); + var transport = options.KafkaTransport(name); return new KafkaTransportExpression(transport, options); } @@ -60,9 +60,9 @@ public static KafkaTransportExpression ConfigureKafka(this WolverineOptions opti /// /// Optional configuration for this Rabbit Mq queue if being initialized by Wolverine /// - public static KafkaListenerConfiguration ListenToKafkaTopic(this WolverineOptions endpoints, string topicName) + public static KafkaListenerConfiguration ListenToKafkaTopic(this WolverineOptions endpoints, string topicName, BrokerName? name = null) { - var transport = endpoints.KafkaTransport(); + var transport = endpoints.KafkaTransport(name); var endpoint = transport.Topics[topicName]; endpoint.EndpointName = topicName; @@ -77,15 +77,15 @@ public static KafkaListenerConfiguration ListenToKafkaTopic(this WolverineOption /// /// /// - public static KafkaSubscriberConfiguration ToKafkaTopic(this IPublishToExpression publishing, string topicName) + public static KafkaSubscriberConfiguration ToKafkaTopic(this IPublishToExpression publishing, string topicName, BrokerName? name = null) { var transports = publishing.As().Parent.Transports; - var transport = transports.GetOrCreate(); + var transport = transports.GetOrCreate(name); var topic = transport.Topics[topicName]; // This is necessary unfortunately to hook up the subscription rules - publishing.To(topic.Uri); + publishing.To(topic.Uri, name); return new KafkaSubscriberConfiguration(topic); } diff --git a/src/Wolverine/Configuration/IPublishToExpression.cs b/src/Wolverine/Configuration/IPublishToExpression.cs index 4f7337aed..42cb63b3f 100644 --- a/src/Wolverine/Configuration/IPublishToExpression.cs +++ b/src/Wolverine/Configuration/IPublishToExpression.cs @@ -10,8 +10,9 @@ public interface IPublishToExpression /// /// /// + /// /// - ISubscriberConfiguration To(Uri uri); + ISubscriberConfiguration To(Uri uri, BrokerName? brokerName = null); /// /// Send all the matching messages to the designated Uri string diff --git a/src/Wolverine/Configuration/PublishingExpression.cs b/src/Wolverine/Configuration/PublishingExpression.cs index 84f971731..d262973a7 100644 --- a/src/Wolverine/Configuration/PublishingExpression.cs +++ b/src/Wolverine/Configuration/PublishingExpression.cs @@ -26,9 +26,17 @@ internal PublishingExpression(WolverineOptions parent) /// /// /// - public ISubscriberConfiguration To(Uri uri) + public ISubscriberConfiguration To(Uri uri, BrokerName? brokerName = null) { - var endpoint = Parent.Transports.GetOrCreateEndpoint(uri); + Endpoint? endpoint = null; + if (brokerName is null) + { + endpoint = Parent.Transports.GetOrCreateEndpoint(uri); + } + else + { + endpoint = Parent.Transports.ForScheme(brokerName.Name)?.GetOrCreateEndpoint(uri) ?? throw new InvalidOperationException($"Broker with name '{brokerName}' not found"); + } AddSubscriber(endpoint); diff --git a/src/Wolverine/Runtime/WolverineRuntime.HostService.cs b/src/Wolverine/Runtime/WolverineRuntime.HostService.cs index 52b2c7041..87488798e 100644 --- a/src/Wolverine/Runtime/WolverineRuntime.HostService.cs +++ b/src/Wolverine/Runtime/WolverineRuntime.HostService.cs @@ -235,6 +235,7 @@ private async Task startMessagingTransportsAsync() { // There are a couple other places where senders might be getting // started before this point, so latch to avoid double creations + // TODO: this will not work for two brokers if (_endpoints.HasSender(endpoint.Uri)) continue; var agent = endpoint.StartSending(this, replyUri); diff --git a/src/Wolverine/TransportCollection.cs b/src/Wolverine/TransportCollection.cs index 150fcea7b..fcd899663 100644 --- a/src/Wolverine/TransportCollection.cs +++ b/src/Wolverine/TransportCollection.cs @@ -1,5 +1,6 @@ using System.Collections; using JasperFx.Core; +using JasperFx.Core.Reflection; using Wolverine.Configuration; using Wolverine.Transports; using Wolverine.Transports.Local; @@ -60,7 +61,7 @@ internal void AddPolicy(IEndpointPolicy policy) public ITransport? ForScheme(string scheme) { - return _transports.TryGetValue(scheme.ToLowerInvariant(), out var transport) + return _transports.TryGetValue(scheme, out var transport) ? transport : null; } @@ -85,14 +86,13 @@ public void Add(ITransport transport) } else { - var transport = _transports.Values.OfType().FirstOrDefault(x => x.Protocol == name.Name); - if (transport == null) + if (!_transports.TryGetValue(name.Name, out var transport)) { transport = (T)Activator.CreateInstance(typeof(T), name.Name); _transports[name.Name] = transport; } - return transport; + return transport.As(); } }