Skip to content

Commit f98f9c3

Browse files
committed
#33 Implemented custom IEndpointCollection implementation for LoopbackConfigration to support sending to arbitrary named endpoints
1 parent 89394d8 commit f98f9c3

File tree

8 files changed

+167
-221
lines changed

8 files changed

+167
-221
lines changed
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using Platibus.Config;
5+
using Platibus.InMemory;
6+
using Xunit;
7+
8+
namespace Platibus.UnitTests
9+
{
10+
public class LoopbackTests
11+
{
12+
protected LoopbackConfiguration Configuration = new LoopbackConfiguration();
13+
protected object MessageContent = Guid.NewGuid().ToString();
14+
protected SendOptions SendOptions = new SendOptions {ContentType = "text/plain"};
15+
protected CancellationTokenSource CancellationSource = new CancellationTokenSource();
16+
17+
public LoopbackTests()
18+
{
19+
Configuration.MessageQueueingService = new InMemoryMessageQueueingService();
20+
}
21+
22+
[Theory]
23+
[InlineData("loopback")] // Standard loopback endpoint
24+
[InlineData("undefined")] // Undefined/unknown endpoint
25+
public async Task MessageIsHandledWhenSendingToNamedEndpoint(string endpointName)
26+
{
27+
var handlerTask = GivenHandler();
28+
await WhenSendingToNamedEndpoint(endpointName);
29+
var handledContent = await handlerTask;
30+
Assert.Equal(MessageContent, handledContent);
31+
}
32+
33+
[Theory]
34+
[InlineData("urn:localhost/loopback")] // Standard loopback endpoint address
35+
[InlineData("http://localhost/undefined")] // Undefined/unknown endpoint address
36+
public async Task MessageIsHandledWhenSendingToEndpointAddress(string address)
37+
{
38+
var handlerTask = GivenHandler();
39+
await WhenSendingToEndpointAddress(new Uri(address));
40+
var handledContent = await handlerTask;
41+
Assert.Equal(MessageContent, handledContent);
42+
}
43+
44+
protected Task<object> GivenHandler()
45+
{
46+
var handlerCompletionSource = new TaskCompletionSource<object>();
47+
CancellationSource.Token.Register(() => handlerCompletionSource.TrySetCanceled());
48+
Configuration.AddHandlingRule<object>(".*", (content, ctx) =>
49+
{
50+
handlerCompletionSource.TrySetResult(content);
51+
ctx.Acknowledge();
52+
});
53+
return handlerCompletionSource.Task;
54+
}
55+
56+
protected async Task WhenSendingToNamedEndpoint(EndpointName endpointName)
57+
{
58+
var host = await LoopbackHost.Start(Configuration, CancellationSource.Token);
59+
await host.Bus.Send(MessageContent, endpointName, SendOptions, CancellationSource.Token);
60+
}
61+
62+
protected async Task WhenSendingToEndpointAddress(Uri endpointAddress)
63+
{
64+
var host = await LoopbackHost.Start(Configuration, CancellationSource.Token);
65+
await host.Bus.Send(MessageContent, endpointAddress, SendOptions, CancellationSource.Token);
66+
}
67+
}
68+
}

Source/Platibus/Bus.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public Bus(IPlatibusConfiguration configuration, Uri baseUri, ITransportService
9191
_messageNamingService = configuration.MessageNamingService;
9292
_serializationService = configuration.SerializationService;
9393

94-
_endpoints = new ReadOnlyEndpointCollection(configuration.Endpoints);
94+
_endpoints = configuration.Endpoints ?? EndpointCollection.Empty;
9595
_topics = configuration.Topics.ToList();
9696
_sendRules = configuration.SendRules.ToList();
9797
_handlingRules = configuration.HandlingRules.ToList();

Source/Platibus/Config/LoopbackConfiguration.cs

Lines changed: 22 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -25,90 +25,48 @@
2525

2626
namespace Platibus.Config
2727
{
28+
/// <inheritdoc cref="PlatibusConfiguration"/>
29+
/// <inheritdoc cref="ILoopbackConfiguration"/>
2830
/// <summary>
2931
/// A loopback configuration
3032
/// </summary>
3133
public class LoopbackConfiguration : PlatibusConfiguration, ILoopbackConfiguration
3234
{
33-
private readonly EndpointName _loopbackEndpoint;
35+
private static readonly EndpointName LoopbackEndpoint = "loopback";
36+
private static readonly Uri LoopbackUri = new Uri("urn:localhost/loopback");
3437

35-
/// <summary>
36-
/// The base URI of the loopback bus instance
37-
/// </summary>
38-
public Uri BaseUri { get; }
38+
/// <inheritdoc />
39+
public Uri BaseUri => LoopbackUri;
3940

40-
/// <summary>
41-
/// The message queueing service to use
42-
/// </summary>
41+
/// <inheritdoc />
4342
public IMessageQueueingService MessageQueueingService { get; set; }
4443

44+
/// <inheritdoc />
45+
/// <summary>
46+
/// Initializes a new <see cref="T:Platibus.Config.LoopbackConfiguration" />
47+
/// </summary>
48+
public LoopbackConfiguration() : this(null)
49+
{
50+
}
4551

52+
/// <inheritdoc />
4653
/// <summary>
47-
/// Initializes a new <see cref="LoopbackConfiguration"/> with a preconfigured
48-
/// <paramref name="diagnosticService"/>
54+
/// Initializes a new <see cref="T:Platibus.Config.LoopbackConfiguration" /> with a preconfigured
55+
/// <paramref name="diagnosticService" />
4956
/// </summary>
5057
/// <param name="diagnosticService">(Optional) The service through which diagnostic events
5158
/// are reported and processed</param>
52-
public LoopbackConfiguration(IDiagnosticService diagnosticService = null) : base(diagnosticService)
59+
public LoopbackConfiguration(IDiagnosticService diagnosticService) : base(diagnosticService, new LoopbackEndpoints(LoopbackEndpoint, LoopbackUri))
5360
{
54-
_loopbackEndpoint = new EndpointName("looback");
55-
BaseUri = new Uri("urn:localhost/loopback");
56-
base.AddEndpoint(_loopbackEndpoint, new Endpoint(BaseUri));
5761
var allMessages = new MessageNamePatternSpecification(".*");
58-
base.AddSendRule(new SendRule(allMessages, _loopbackEndpoint));
62+
base.AddSendRule(new SendRule(allMessages, LoopbackEndpoint));
5963
}
60-
61-
/// <summary>
62-
/// Adds a topic to the configuration
63-
/// </summary>
64-
/// <param name="topic">The name of the topic</param>
65-
/// <remarks>
66-
/// Topics must be explicitly added in order to publish messages to them
67-
/// </remarks>
64+
65+
/// <inheritdoc />
6866
public override void AddTopic(TopicName topic)
6967
{
7068
base.AddTopic(topic);
71-
base.AddSubscription(new Subscription(_loopbackEndpoint, topic));
72-
}
73-
74-
/// <summary>
75-
/// Adds a named endpoint to the configuration
76-
/// </summary>
77-
/// <param name="name">The name of the endpoint</param>
78-
/// <param name="endpoint">The endpoint</param>
79-
/// <remarks>
80-
/// Not supported in loopback configurations
81-
/// </remarks>
82-
/// <exception cref="NotSupportedException">Always thrown</exception>
83-
public override void AddEndpoint(EndpointName name, IEndpoint endpoint)
84-
{
85-
throw new NotSupportedException();
86-
}
87-
88-
/// <summary>
89-
/// Adds a rule governing to which endpoints messages will be sent
90-
/// </summary>
91-
/// <param name="sendRule">The send rule</param>
92-
/// <remarks>
93-
/// Not supported in loopback configurations
94-
/// </remarks>
95-
/// <exception cref="NotSupportedException">Always thrown</exception>
96-
public override void AddSendRule(ISendRule sendRule)
97-
{
98-
throw new NotSupportedException();
99-
}
100-
101-
/// <summary>
102-
/// Adds a subscription to a local or remote topic
103-
/// </summary>
104-
/// <param name="subscription">The subscription</param>
105-
/// <remarks>
106-
/// Not supported in loopback configurations
107-
/// </remarks>
108-
/// <exception cref="NotSupportedException">Always thrown</exception>
109-
public override void AddSubscription(ISubscription subscription)
110-
{
111-
throw new NotSupportedException();
69+
AddSubscription(new Subscription(LoopbackEndpoint, topic));
11270
}
11371
}
11472
}

Source/Platibus/Config/PlatibusConfiguration.cs

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@
2828

2929
namespace Platibus.Config
3030
{
31+
/// <inheritdoc />
3132
/// <summary>
32-
/// Concrete mutable implementation of <see cref="IPlatibusConfiguration" /> used for
33+
/// Concrete mutable implementation of <see cref="T:Platibus.Config.IPlatibusConfiguration" /> used for
3334
/// programmatically configuring the bus.
3435
/// </summary>
3536
/// <remarks>
@@ -39,7 +40,7 @@ namespace Platibus.Config
3940
/// </remarks>
4041
public class PlatibusConfiguration : IPlatibusConfiguration
4142
{
42-
private readonly EndpointCollection _endpoints = new EndpointCollection();
43+
private readonly EndpointCollection _endpoints;
4344
private readonly IList<IHandlingRule> _handlingRules = new List<IHandlingRule>();
4445
private readonly IList<ISendRule> _sendRules = new List<ISendRule>();
4546
private readonly IList<ISubscription> _subscriptions = new List<ISubscription>();
@@ -81,23 +82,49 @@ public class PlatibusConfiguration : IPlatibusConfiguration
8182
/// <inheritdoc />
8283
public SendOptions DefaultSendOptions { get; set; }
8384

85+
/// <inheritdoc />
86+
/// <summary>
87+
/// Initializes a new <see cref="T:Platibus.Config.PlatibusConfiguration" />
88+
/// </summary>
89+
public PlatibusConfiguration() : this(null, null)
90+
{
91+
}
92+
93+
/// <inheritdoc />
94+
/// <summary>
95+
/// Initializes a new <see cref="T:Platibus.Config.PlatibusConfiguration" /> with a preconfigured
96+
/// <paramref name="diagnosticService" />
97+
/// </summary>
98+
/// <param name="diagnosticService">(Optional) The service through which diagnostic events
99+
/// are reported and processed</param>
100+
/// <remarks>
101+
/// If a custom <paramref name="diagnosticService" /> is not specified, then the default
102+
/// singleton instance <see cref="F:Platibus.Diagnostics.DiagnosticService.DefaultInstance" /> will
103+
/// be used.
104+
/// </remarks>
105+
public PlatibusConfiguration(IDiagnosticService diagnosticService) : this(diagnosticService, null)
106+
{
107+
}
108+
84109
/// <summary>
85110
/// Initializes a new <see cref="PlatibusConfiguration"/> with a preconfigured
86-
/// <paramref name="diagnosticService"/>
111+
/// <paramref name="diagnosticService"/> and an initial set of <paramref name="endpoints"/>
87112
/// </summary>
88113
/// <param name="diagnosticService">(Optional) The service through which diagnostic events
89114
/// are reported and processed</param>
115+
/// <param name="endpoints">(Optional) An initial set of default endpoints</param>
90116
/// <remarks>
91117
/// If a custom <paramref name="diagnosticService"/> is not specified, then the default
92118
/// singleton instance <see cref="Diagnostics.DiagnosticService.DefaultInstance"/> will
93119
/// be used.
94120
/// </remarks>
95-
public PlatibusConfiguration(IDiagnosticService diagnosticService = null)
121+
public PlatibusConfiguration(IDiagnosticService diagnosticService, EndpointCollection endpoints)
96122
{
97123
MessageNamingService = new DefaultMessageNamingService();
98124
SerializationService = new DefaultSerializationService();
99125
DefaultContentType = "application/json";
100126
DiagnosticService = diagnosticService ?? Diagnostics.DiagnosticService.DefaultInstance;
127+
_endpoints = endpoints ?? new EndpointCollection();
101128
}
102129

103130
/// <summary>

Source/Platibus/EndpointCollection.cs

Lines changed: 11 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,14 @@
2828

2929
namespace Platibus
3030
{
31+
/// <inheritdoc />
3132
/// <summary>
32-
/// A mutable <see cref="IEndpointCollection"/> implementation
33+
/// A mutable <see cref="T:Platibus.IEndpointCollection" /> implementation
3334
/// </summary>
3435
public class EndpointCollection : IEndpointCollection
3536
{
37+
public static readonly IEndpointCollection Empty = new EndpointCollection();
38+
3639
private readonly IDictionary<EndpointName, IEndpoint> _endpoints = new Dictionary<EndpointName, IEndpoint>();
3740

3841
/// <summary>
@@ -44,21 +47,15 @@ public class EndpointCollection : IEndpointCollection
4447
/// or <paramref name="endpoint"/> are <c>null</c></exception>
4548
/// <exception cref="EndpointAlreadyExistsException">Thrown if there is already an
4649
/// endpoint with the specified <paramref name="endpointName"/></exception>
47-
public void Add(EndpointName endpointName, IEndpoint endpoint)
50+
public virtual void Add(EndpointName endpointName, IEndpoint endpoint)
4851
{
4952
if (endpointName == null) throw new ArgumentNullException(nameof(endpointName));
5053
if (_endpoints.ContainsKey(endpointName)) throw new EndpointAlreadyExistsException(endpointName);
5154
_endpoints[endpointName] = endpoint ?? throw new ArgumentNullException(nameof(endpoint));
5255
}
5356

54-
/// <summary>
55-
/// Returns the endpoint with the specified name
56-
/// </summary>
57-
/// <param name="endpointName">The name of the endpoint</param>
58-
/// <returns>Returns the endpoint</returns>
59-
/// <exception cref="EndpointNotFoundException">Thrown if there
60-
/// is no endpoint with the specified name</exception>
61-
public IEndpoint this[EndpointName endpointName]
57+
/// <inheritdoc />
58+
public virtual IEndpoint this[EndpointName endpointName]
6259
{
6360
get
6461
{
@@ -70,23 +67,16 @@ public IEndpoint this[EndpointName endpointName]
7067
}
7168
}
7269

73-
/// <summary>
74-
/// Tries to retrieve the endpoint with the specified address
75-
/// </summary>
76-
/// <param name="address">The address of the endpoint</param>
77-
/// <param name="endpoint">An output parameter that will be initialied
78-
/// with the endpoint if the endpoint is found</param>
79-
/// <returns>Returns <c>true</c> if the endpoint is found; <c>false</c>
80-
/// otherwise</returns>
81-
public bool TryGetEndpointByAddress(Uri address, out IEndpoint endpoint)
70+
/// <inheritdoc />
71+
public virtual bool TryGetEndpointByAddress(Uri address, out IEndpoint endpoint)
8272
{
8373
var comparer = new EndpointAddressEqualityComparer();
8474
endpoint = _endpoints.Values.FirstOrDefault(e => comparer.Equals(e.Address, address));
8575
return endpoint != null;
8676
}
8777

8878
/// <inheritdoc />
89-
public bool Contains(EndpointName endpointName)
79+
public virtual bool Contains(EndpointName endpointName)
9080
{
9181
return _endpoints.ContainsKey(endpointName);
9282
}
@@ -96,13 +86,7 @@ IEnumerator IEnumerable.GetEnumerator()
9686
return GetEnumerator();
9787
}
9888

99-
/// <summary>
100-
/// Returns an enumerator that iterates through the collection.
101-
/// </summary>
102-
/// <returns>
103-
/// A <see cref="T:System.Collections.Generic.IEnumerator`1"/> that can be used to iterate through the collection.
104-
/// </returns>
105-
/// <filterpriority>1</filterpriority>
89+
/// <inheritdoc />
10690
public IEnumerator<KeyValuePair<EndpointName, IEndpoint>> GetEnumerator()
10791
{
10892
return _endpoints.GetEnumerator();

Source/Platibus/Http/HttpTransportService.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,7 @@ public HttpTransportService(HttpTransportServiceOptions options)
7171
if (options == null) throw new ArgumentNullException(nameof(options));
7272

7373
_baseUri = options.BaseUri.WithTrailingSlash();
74-
_endpoints = options.Endpoints == null
75-
? ReadOnlyEndpointCollection.Empty
76-
: new ReadOnlyEndpointCollection(options.Endpoints);
74+
_endpoints = options.Endpoints = options.Endpoints ?? EndpointCollection.Empty;
7775

7876
_messageQueueingService = options.MessageQueueingService;
7977
_messageJournal = options.MessageJournal;
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
using System;
2+
3+
namespace Platibus
4+
{
5+
internal class LoopbackEndpoints : EndpointCollection
6+
{
7+
private readonly Uri _baseUri;
8+
9+
public LoopbackEndpoints(EndpointName name, Uri baseUri)
10+
{
11+
_baseUri = baseUri ?? throw new ArgumentNullException(nameof(baseUri));
12+
base.Add(name, new Endpoint(baseUri));
13+
}
14+
15+
public override void Add(EndpointName endpointName, IEndpoint endpoint)
16+
{
17+
}
18+
19+
public override IEndpoint this[EndpointName endpointName] => new Endpoint(_baseUri);
20+
21+
public override bool TryGetEndpointByAddress(Uri address, out IEndpoint endpoint)
22+
{
23+
endpoint = new Endpoint(_baseUri);
24+
return true;
25+
}
26+
27+
public override bool Contains(EndpointName endpointName)
28+
{
29+
return true;
30+
}
31+
32+
}
33+
}

0 commit comments

Comments
 (0)