-
-
Notifications
You must be signed in to change notification settings - Fork 193
Allow many brokers for Kafka #1428
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
@@ -14,11 +14,11 @@ public static class KafkaTransportExtensions | |||
/// </summary> | |||
/// <param name="endpoints"></param> | |||
/// <returns></returns> | |||
internal static KafkaTransport KafkaTransport(this WolverineOptions endpoints) | |||
internal static KafkaTransport KafkaTransport(this WolverineOptions endpoints, BrokerName? name = null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BrokerName is not passed to GetOrCreate so I added this here
{ | ||
var endpoint = Parent.Transports.GetOrCreateEndpoint(uri); | ||
Endpoint? endpoint = null; | ||
if (brokerName is null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When using BrokerName Parent.Transports.GetOrCreateEndpoint is not working because we have more then one with same scheme so I added condition. ForScheme has misleading name because it is looking transport name that could be other than scheme.
@@ -234,6 +234,7 @@ private async Task startMessagingTransportsAsync() | |||
{ | |||
// There are a couple other places where senders might be getting | |||
// started before this point, so latch to avoid double creations | |||
// TODO: this will not work for two brokers | |||
if (_endpoints.HasSender(endpoint.Uri)) continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is problematic because when having more than one kafka broker we have more endpoints with 'wolverine.topics' so for second broker agent will not be created. I'm not sure how to fix this and not broke something else. (For testing purposes when commented this line of code I could work with success but it may break in other cases).
@@ -60,7 +61,7 @@ internal void AddPolicy(IEndpointPolicy policy) | |||
|
|||
public ITransport? ForScheme(string scheme) | |||
{ | |||
return _transports.TryGetValue(scheme.ToLowerInvariant(), out var transport) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When transport has not only schemas but broker name it could have names with capitals and when using to lower they are not found.
@@ -85,14 +86,13 @@ public void Add(ITransport transport) | |||
} | |||
else | |||
{ | |||
var transport = _transports.Values.OfType<T>().FirstOrDefault(x => x.Protocol == name.Name); | |||
if (transport == null) | |||
if (!_transports.TryGetValue(name.Name, out var transport)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When using broker name we should not search by protocol
@@ -85,14 +86,13 @@ public void Add(ITransport transport) | |||
} | |||
else | |||
{ | |||
var transport = _transports.Values.OfType<T>().FirstOrDefault(x => x.Protocol == name.Name); | |||
if (transport == null) | |||
if (!_transports.TryGetValue(name.Name, out var transport)) | |||
{ | |||
transport = (T)Activator.CreateInstance(typeof(T), name.Name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is fragile - not every transport has one parameter constructor
0f71fb0
to
f9ba53d
Compare
f9ba53d
to
5dbac8d
Compare
This PR is not complete because it needs some additional changes. Please read comments in files.