Skip to content

Allow many brokers for Kafka #1428

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ public class KafkaTransport : BrokerTransport<KafkaTopic>
public AdminClientConfig AdminClientConfig { get; } = new();
public Action<AdminClientBuilder> ConfigureAdminClientBuilders { get; internal set; } = _ => {};

public KafkaTransport() : base("kafka", "Kafka Topics")
public KafkaTransport() : this("Kafka Topics") { }

public KafkaTransport(string brokerName) : base("kafka", brokerName)
{
Topics = new Cache<string, KafkaTopic>(topicName => new KafkaTopic(this, topicName, EndpointRole.Application));
}
Expand Down
22 changes: 11 additions & 11 deletions src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ public static class KafkaTransportExtensions
/// </summary>
/// <param name="endpoints"></param>
/// <returns></returns>
internal static KafkaTransport KafkaTransport(this WolverineOptions endpoints)
internal static KafkaTransport KafkaTransport(this WolverineOptions endpoints, BrokerName? name = null)
Copy link
Contributor Author

@dominikjeske dominikjeske May 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BrokerName is not passed to GetOrCreate so I added this here

{
var transports = endpoints.As<WolverineOptions>().Transports;

return transports.GetOrCreate<KafkaTransport>();
return transports.GetOrCreate<KafkaTransport>(name);
}

/// <summary>
Expand All @@ -27,11 +27,11 @@ internal static KafkaTransport KafkaTransport(this WolverineOptions endpoints)
/// <param name="options"></param>
/// <param name="configure"></param>
/// <returns></returns>
public static KafkaTransportExpression UseKafka(this WolverineOptions options, string bootstrapServers)
public static KafkaTransportExpression UseKafka(this WolverineOptions options, string bootstrapServers, BrokerName? name = null)
{
// Automatic failure acks do not work with Kafka serialization failures
options.EnableAutomaticFailureAcks = false;
var transport = options.KafkaTransport();
var transport = options.KafkaTransport(name);
transport.ConsumerConfig.BootstrapServers = bootstrapServers;
transport.ProducerConfig.BootstrapServers = bootstrapServers;
transport.AdminClientConfig.BootstrapServers = bootstrapServers;
Expand All @@ -45,9 +45,9 @@ public static KafkaTransportExpression UseKafka(this WolverineOptions options, s
/// <param name="options"></param>
/// <param name="configure"></param>
/// <returns></returns>
public static KafkaTransportExpression ConfigureKafka(this WolverineOptions options, string bootstrapServers)
public static KafkaTransportExpression ConfigureKafka(this WolverineOptions options, string bootstrapServers, BrokerName? name = null)
{
var transport = options.KafkaTransport();
var transport = options.KafkaTransport(name);

return new KafkaTransportExpression(transport, options);
}
Expand All @@ -60,9 +60,9 @@ public static KafkaTransportExpression ConfigureKafka(this WolverineOptions opti
/// <param name="configure">
/// Optional configuration for this Rabbit Mq queue if being initialized by Wolverine
/// <returns></returns>
public static KafkaListenerConfiguration ListenToKafkaTopic(this WolverineOptions endpoints, string topicName)
public static KafkaListenerConfiguration ListenToKafkaTopic(this WolverineOptions endpoints, string topicName, BrokerName? name = null)
{
var transport = endpoints.KafkaTransport();
var transport = endpoints.KafkaTransport(name);

var endpoint = transport.Topics[topicName];
endpoint.EndpointName = topicName;
Expand All @@ -77,15 +77,15 @@ public static KafkaListenerConfiguration ListenToKafkaTopic(this WolverineOption
/// <param name="publishing"></param>
/// <param name="topicName"></param>
/// <returns></returns>
public static KafkaSubscriberConfiguration ToKafkaTopic(this IPublishToExpression publishing, string topicName)
public static KafkaSubscriberConfiguration ToKafkaTopic(this IPublishToExpression publishing, string topicName, BrokerName? name = null)
{
var transports = publishing.As<PublishingExpression>().Parent.Transports;
var transport = transports.GetOrCreate<KafkaTransport>();
var transport = transports.GetOrCreate<KafkaTransport>(name);

var topic = transport.Topics[topicName];

// This is necessary unfortunately to hook up the subscription rules
publishing.To(topic.Uri);
publishing.To(topic.Uri, name);

return new KafkaSubscriberConfiguration(topic);
}
Expand Down
3 changes: 2 additions & 1 deletion src/Wolverine/Configuration/IPublishToExpression.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ public interface IPublishToExpression
/// </summary>
/// <param name="uri"></param>
/// <param name="address"></param>
/// <param name="brokerName"></param>
/// <returns></returns>
ISubscriberConfiguration To(Uri uri);
ISubscriberConfiguration To(Uri uri, BrokerName? brokerName = null);

/// <summary>
/// Send all the matching messages to the designated Uri string
Expand Down
12 changes: 10 additions & 2 deletions src/Wolverine/Configuration/PublishingExpression.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,17 @@ internal PublishingExpression(WolverineOptions parent)
/// </summary>
/// <param name="uri"></param>
/// <returns></returns>
public ISubscriberConfiguration To(Uri uri)
public ISubscriberConfiguration To(Uri uri, BrokerName? brokerName = null)
{
var endpoint = Parent.Transports.GetOrCreateEndpoint(uri);
Endpoint? endpoint = null;
if (brokerName is null)
Copy link
Contributor Author

@dominikjeske dominikjeske May 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When using BrokerName Parent.Transports.GetOrCreateEndpoint is not working because we have more then one with same scheme so I added condition. ForScheme has misleading name because it is looking transport name that could be other than scheme.

{
endpoint = Parent.Transports.GetOrCreateEndpoint(uri);
}
else
{
endpoint = Parent.Transports.ForScheme(brokerName.Name)?.GetOrCreateEndpoint(uri) ?? throw new InvalidOperationException($"Broker with name '{brokerName}' not found");
}

AddSubscriber(endpoint);

Expand Down
1 change: 1 addition & 0 deletions src/Wolverine/Runtime/WolverineRuntime.HostService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ private async Task startMessagingTransportsAsync()
{
// There are a couple other places where senders might be getting
// started before this point, so latch to avoid double creations
// TODO: this will not work for two brokers
if (_endpoints.HasSender(endpoint.Uri)) continue;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is problematic because when having more than one kafka broker we have more endpoints with 'wolverine.topics' so for second broker agent will not be created. I'm not sure how to fix this and not broke something else. (For testing purposes when commented this line of code I could work with success but it may break in other cases).


var agent = endpoint.StartSending(this, replyUri);
Expand Down
8 changes: 4 additions & 4 deletions src/Wolverine/TransportCollection.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Collections;
using JasperFx.Core;
using JasperFx.Core.Reflection;
using Wolverine.Configuration;
using Wolverine.Transports;
using Wolverine.Transports.Local;
Expand Down Expand Up @@ -60,7 +61,7 @@ internal void AddPolicy(IEndpointPolicy policy)

public ITransport? ForScheme(string scheme)
{
return _transports.TryGetValue(scheme.ToLowerInvariant(), out var transport)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When transport has not only schemas but broker name it could have names with capitals and when using to lower they are not found.

return _transports.TryGetValue(scheme, out var transport)
? transport
: null;
}
Expand All @@ -85,14 +86,13 @@ public void Add(ITransport transport)
}
else
{
var transport = _transports.Values.OfType<T>().FirstOrDefault(x => x.Protocol == name.Name);
if (transport == null)
if (!_transports.TryGetValue(name.Name, out var transport))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When using broker name we should not search by protocol

{
transport = (T)Activator.CreateInstance(typeof(T), name.Name);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is fragile - not every transport has one parameter constructor

_transports[name.Name] = transport;
}

return transport;
return transport.As<T>();
}

}
Expand Down
Loading