-
-
Notifications
You must be signed in to change notification settings - Fork 193
Allow many brokers for Kafka #1428
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,9 +26,17 @@ internal PublishingExpression(WolverineOptions parent) | |
/// </summary> | ||
/// <param name="uri"></param> | ||
/// <returns></returns> | ||
public ISubscriberConfiguration To(Uri uri) | ||
public ISubscriberConfiguration To(Uri uri, BrokerName? brokerName = null) | ||
{ | ||
var endpoint = Parent.Transports.GetOrCreateEndpoint(uri); | ||
Endpoint? endpoint = null; | ||
if (brokerName is null) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When using BrokerName Parent.Transports.GetOrCreateEndpoint is not working because we have more then one with same scheme so I added condition. ForScheme has misleading name because it is looking transport name that could be other than scheme. |
||
{ | ||
endpoint = Parent.Transports.GetOrCreateEndpoint(uri); | ||
} | ||
else | ||
{ | ||
endpoint = Parent.Transports.ForScheme(brokerName.Name)?.GetOrCreateEndpoint(uri) ?? throw new InvalidOperationException($"Broker with name '{brokerName}' not found"); | ||
} | ||
|
||
AddSubscriber(endpoint); | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -235,6 +235,7 @@ private async Task startMessagingTransportsAsync() | |
{ | ||
// There are a couple other places where senders might be getting | ||
// started before this point, so latch to avoid double creations | ||
// TODO: this will not work for two brokers | ||
if (_endpoints.HasSender(endpoint.Uri)) continue; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code is problematic because when having more than one kafka broker we have more endpoints with 'wolverine.topics' so for second broker agent will not be created. I'm not sure how to fix this and not broke something else. (For testing purposes when commented this line of code I could work with success but it may break in other cases). |
||
|
||
var agent = endpoint.StartSending(this, replyUri); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
using System.Collections; | ||
using JasperFx.Core; | ||
using JasperFx.Core.Reflection; | ||
using Wolverine.Configuration; | ||
using Wolverine.Transports; | ||
using Wolverine.Transports.Local; | ||
|
@@ -60,7 +61,7 @@ internal void AddPolicy(IEndpointPolicy policy) | |
|
||
public ITransport? ForScheme(string scheme) | ||
{ | ||
return _transports.TryGetValue(scheme.ToLowerInvariant(), out var transport) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When transport has not only schemas but broker name it could have names with capitals and when using to lower they are not found. |
||
return _transports.TryGetValue(scheme, out var transport) | ||
? transport | ||
: null; | ||
} | ||
|
@@ -85,14 +86,13 @@ public void Add(ITransport transport) | |
} | ||
else | ||
{ | ||
var transport = _transports.Values.OfType<T>().FirstOrDefault(x => x.Protocol == name.Name); | ||
if (transport == null) | ||
if (!_transports.TryGetValue(name.Name, out var transport)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When using broker name we should not search by protocol |
||
{ | ||
transport = (T)Activator.CreateInstance(typeof(T), name.Name); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code is fragile - not every transport has one parameter constructor |
||
_transports[name.Name] = transport; | ||
} | ||
|
||
return transport; | ||
return transport.As<T>(); | ||
} | ||
|
||
} | ||
|
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BrokerName is not passed to GetOrCreate so I added this here