Skip to content

Allow many brokers for Kafka #1428

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

dominikjeske
Copy link
Contributor

This PR is not complete because it needs some additional changes. Please read comments in files.

@@ -14,11 +14,11 @@ public static class KafkaTransportExtensions
/// </summary>
/// <param name="endpoints"></param>
/// <returns></returns>
internal static KafkaTransport KafkaTransport(this WolverineOptions endpoints)
internal static KafkaTransport KafkaTransport(this WolverineOptions endpoints, BrokerName? name = null)
Copy link
Contributor Author

@dominikjeske dominikjeske May 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BrokerName is not passed to GetOrCreate so I added this here

{
var endpoint = Parent.Transports.GetOrCreateEndpoint(uri);
Endpoint? endpoint = null;
if (brokerName is null)
Copy link
Contributor Author

@dominikjeske dominikjeske May 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When using BrokerName Parent.Transports.GetOrCreateEndpoint is not working because we have more then one with same scheme so I added condition. ForScheme has misleading name because it is looking transport name that could be other than scheme.

@@ -234,6 +234,7 @@ private async Task startMessagingTransportsAsync()
{
// There are a couple other places where senders might be getting
// started before this point, so latch to avoid double creations
// TODO: this will not work for two brokers
if (_endpoints.HasSender(endpoint.Uri)) continue;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is problematic because when having more than one kafka broker we have more endpoints with 'wolverine.topics' so for second broker agent will not be created. I'm not sure how to fix this and not broke something else. (For testing purposes when commented this line of code I could work with success but it may break in other cases).

@@ -60,7 +61,7 @@ internal void AddPolicy(IEndpointPolicy policy)

public ITransport? ForScheme(string scheme)
{
return _transports.TryGetValue(scheme.ToLowerInvariant(), out var transport)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When transport has not only schemas but broker name it could have names with capitals and when using to lower they are not found.

@@ -85,14 +86,13 @@ public void Add(ITransport transport)
}
else
{
var transport = _transports.Values.OfType<T>().FirstOrDefault(x => x.Protocol == name.Name);
if (transport == null)
if (!_transports.TryGetValue(name.Name, out var transport))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When using broker name we should not search by protocol

@@ -85,14 +86,13 @@ public void Add(ITransport transport)
}
else
{
var transport = _transports.Values.OfType<T>().FirstOrDefault(x => x.Protocol == name.Name);
if (transport == null)
if (!_transports.TryGetValue(name.Name, out var transport))
{
transport = (T)Activator.CreateInstance(typeof(T), name.Name);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is fragile - not every transport has one parameter constructor

@dominikjeske dominikjeske force-pushed the features/multi_broker branch from 0f71fb0 to f9ba53d Compare May 16, 2025 13:25
@dominikjeske dominikjeske marked this pull request as draft May 19, 2025 17:30
@jeremydmiller jeremydmiller added this to the 4.0 milestone May 27, 2025
@dominikjeske dominikjeske force-pushed the features/multi_broker branch from f9ba53d to 5dbac8d Compare May 28, 2025 16:36
@jeremydmiller jeremydmiller removed this from the 4.0 milestone May 29, 2025
@dominikjeske dominikjeske marked this pull request as ready for review June 10, 2025 19:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants