Skip to content

Commit e2140f2

Browse files
authored
Merge pull request #461 from CommunityToolkit/dev/messenger-observable
Add IObservable<T> extensions for IMessenger
2 parents 45795f2 + accfa0b commit e2140f2

File tree

7 files changed

+312
-2
lines changed

7 files changed

+312
-2
lines changed
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System;
6+
using System.Diagnostics.CodeAnalysis;
7+
using System.Linq;
8+
using System.Linq.Expressions;
9+
using System.Reflection;
10+
using System.Runtime.CompilerServices;
11+
using CommunityToolkit.Mvvm.Messaging.Internals;
12+
13+
namespace CommunityToolkit.Mvvm.Messaging;
14+
15+
/// <inheritdoc/>
16+
partial class IMessengerExtensions
17+
{
18+
/// <summary>
19+
/// Creates an <see cref="IObservable{T}"/> instance that can be used to be notified whenever a message of a given type is broadcast by a messenger.
20+
/// </summary>
21+
/// <typeparam name="TMessage">The type of message to use to receive notification for through the resulting <see cref="IObservable{T}"/> instance.</typeparam>
22+
/// <param name="messenger">The <see cref="IMessenger"/> instance to use to register the recipient.</param>
23+
/// <returns>An <see cref="IObservable{T}"/> instance to receive notifications for <typeparamref name="TMessage"/> messages being broadcast.</returns>
24+
/// <exception cref="System.ArgumentNullException">Thrown if <paramref name="messenger"/> is <see langword="null"/>.</exception>
25+
public static IObservable<TMessage> CreateObservable<TMessage>(this IMessenger messenger)
26+
where TMessage : class
27+
{
28+
ArgumentNullException.ThrowIfNull(messenger);
29+
30+
return new Observable<TMessage>(messenger);
31+
}
32+
33+
/// <summary>
34+
/// Creates an <see cref="IObservable{T}"/> instance that can be used to be notified whenever a message of a given type is broadcast by a messenger.
35+
/// </summary>
36+
/// <typeparam name="TMessage">The type of message to use to receive notification for through the resulting <see cref="IObservable{T}"/> instance.</typeparam>
37+
/// <typeparam name="TToken">The type of token to identify what channel to use to receive messages.</typeparam>
38+
/// <param name="messenger">The <see cref="IMessenger"/> instance to use to register the recipient.</param>
39+
/// <param name="token">A token used to determine the receiving channel to use.</param>
40+
/// <returns>An <see cref="IObservable{T}"/> instance to receive notifications for <typeparamref name="TMessage"/> messages being broadcast.</returns>
41+
/// <exception cref="System.ArgumentNullException">Thrown if <paramref name="messenger"/> or <paramref name="token"/> are <see langword="null"/>.</exception>
42+
public static IObservable<TMessage> CreateObservable<TMessage, TToken>(this IMessenger messenger, TToken token)
43+
where TMessage : class
44+
where TToken : IEquatable<TToken>
45+
{
46+
ArgumentNullException.ThrowIfNull(messenger);
47+
ArgumentNullException.For<TToken>.ThrowIfNull(token);
48+
49+
return new Observable<TMessage, TToken>(messenger, token);
50+
}
51+
52+
/// <summary>
53+
/// An <see cref="IObservable{T}"/> implementations for a given message type.
54+
/// </summary>
55+
/// <typeparam name="TMessage">The type of messages to listen to.</typeparam>
56+
private sealed class Observable<TMessage> : IObservable<TMessage>
57+
where TMessage : class
58+
{
59+
/// <summary>
60+
/// The <see cref="IMessenger"/> instance to use to register the recipient.
61+
/// </summary>
62+
private readonly IMessenger messenger;
63+
64+
/// <summary>
65+
/// Creates a new <see cref="Observable{TMessage}"/> instance with the given parameters.
66+
/// </summary>
67+
/// <param name="messenger">The <see cref="IMessenger"/> instance to use to register the recipient.</param>
68+
public Observable(IMessenger messenger)
69+
{
70+
this.messenger = messenger;
71+
}
72+
73+
/// <inheritdoc/>
74+
public IDisposable Subscribe(IObserver<TMessage> observer)
75+
{
76+
return new Recipient(this.messenger, observer);
77+
}
78+
79+
/// <summary>
80+
/// An <see cref="IRecipient{TMessage}"/> implementation for <see cref="Observable{TMessage}"/>.
81+
/// </summary>
82+
private sealed class Recipient : IRecipient<TMessage>, IDisposable
83+
{
84+
/// <summary>
85+
/// The <see cref="IMessenger"/> instance to use to register the recipient.
86+
/// </summary>
87+
private readonly IMessenger messenger;
88+
89+
/// <summary>
90+
/// The target <see cref="IObserver{T}"/> instance currently in use.
91+
/// </summary>
92+
private readonly IObserver<TMessage> observer;
93+
94+
/// <summary>
95+
/// Creates a new <see cref="Recipient"/> instance with the specified parameters.
96+
/// </summary>
97+
/// <param name="messenger">The <see cref="IMessenger"/> instance to use to register the recipient.</param>
98+
/// <param name="observer">The <see cref="IObserver{T}"/> instance to use to create the recipient for.</param>
99+
public Recipient(IMessenger messenger, IObserver<TMessage> observer)
100+
{
101+
this.messenger = messenger;
102+
this.observer = observer;
103+
104+
messenger.Register(this);
105+
}
106+
107+
/// <inheritdoc/>
108+
public void Receive(TMessage message)
109+
{
110+
this.observer.OnNext(message);
111+
}
112+
113+
/// <inheritdoc/>
114+
public void Dispose()
115+
{
116+
this.messenger.Unregister<TMessage>(this);
117+
}
118+
}
119+
}
120+
121+
/// <summary>
122+
/// An <see cref="IObservable{T}"/> implementations for a given pair of message and token types.
123+
/// </summary>
124+
/// <typeparam name="TMessage">The type of messages to listen to.</typeparam>
125+
/// <typeparam name="TToken">The type of token to identify what channel to use to receive messages.</typeparam>
126+
private sealed class Observable<TMessage, TToken> : IObservable<TMessage>
127+
where TMessage : class
128+
where TToken : IEquatable<TToken>
129+
{
130+
/// <summary>
131+
/// The <see cref="IMessenger"/> instance to use to register the recipient.
132+
/// </summary>
133+
private readonly IMessenger messenger;
134+
135+
/// <summary>
136+
/// The token used to determine the receiving channel to use.
137+
/// </summary>
138+
private readonly TToken token;
139+
140+
/// <summary>
141+
/// Creates a new <see cref="Observable{TMessage, TToken}"/> instance with the given parameters.
142+
/// </summary>
143+
/// <param name="messenger">The <see cref="IMessenger"/> instance to use to register the recipient.</param>
144+
/// <param name="token">A token used to determine the receiving channel to use.</param>
145+
public Observable(IMessenger messenger, TToken token)
146+
{
147+
this.messenger = messenger;
148+
this.token = token;
149+
}
150+
151+
/// <inheritdoc/>
152+
public IDisposable Subscribe(IObserver<TMessage> observer)
153+
{
154+
return new Recipient(this.messenger, observer, this.token);
155+
}
156+
157+
/// <summary>
158+
/// An <see cref="IRecipient{TMessage}"/> implementation for <see cref="Observable{TMessage, TToken}"/>.
159+
/// </summary>
160+
private sealed class Recipient : IRecipient<TMessage>, IDisposable
161+
{
162+
/// <summary>
163+
/// The <see cref="IMessenger"/> instance to use to register the recipient.
164+
/// </summary>
165+
private readonly IMessenger messenger;
166+
167+
/// <summary>
168+
/// The target <see cref="IObserver{T}"/> instance currently in use.
169+
/// </summary>
170+
private readonly IObserver<TMessage> observer;
171+
172+
/// <summary>
173+
/// The token used to determine the receiving channel to use.
174+
/// </summary>
175+
private readonly TToken token;
176+
177+
/// <summary>
178+
/// Creates a new <see cref="Recipient"/> instance with the specified parameters.
179+
/// </summary>
180+
/// <param name="messenger">The <see cref="IMessenger"/> instance to use to register the recipient.</param>
181+
/// <param name="observer">The <see cref="IObserver{T}"/> instance to use to create the recipient for.</param>
182+
/// <param name="token">A token used to determine the receiving channel to use.</param>
183+
public Recipient(IMessenger messenger, IObserver<TMessage> observer, TToken token)
184+
{
185+
this.messenger = messenger;
186+
this.observer = observer;
187+
this.token = token;
188+
189+
messenger.Register(this, token);
190+
}
191+
192+
/// <inheritdoc/>
193+
public void Receive(TMessage message)
194+
{
195+
this.observer.OnNext(message);
196+
}
197+
198+
/// <inheritdoc/>
199+
public void Dispose()
200+
{
201+
this.messenger.Unregister<TMessage, TToken>(this, this.token);
202+
}
203+
}
204+
}
205+
}

CommunityToolkit.Mvvm/Messaging/IMessengerExtensions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ namespace CommunityToolkit.Mvvm.Messaging;
1515
/// <summary>
1616
/// Extensions for the <see cref="IMessenger"/> type.
1717
/// </summary>
18-
public static class IMessengerExtensions
18+
public static partial class IMessengerExtensions
1919
{
2020
/// <summary>
2121
/// A class that acts as a container to load the <see cref="MethodInfo"/> instance linked to

tests/CommunityToolkit.Mvvm.Roslyn401.UnitTests/CommunityToolkit.Mvvm.Roslyn401.UnitTests.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
<PackageReference Include="MSTest.TestAdapter" Version="2.2.10" />
1010
<PackageReference Include="MSTest.TestFramework" Version="2.2.10" />
1111
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.3.2" />
12+
<PackageReference Include="System.Reactive" Version="5.0.0" />
1213
<PackageReference Include="System.Text.Json" Version="6.0.6" />
1314
</ItemGroup>
1415

tests/CommunityToolkit.Mvvm.Roslyn431.UnitTests/CommunityToolkit.Mvvm.Roslyn431.UnitTests.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
<PackageReference Include="MSTest.TestAdapter" Version="2.2.10" />
1010
<PackageReference Include="MSTest.TestFramework" Version="2.2.10" />
1111
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.3.2" />
12+
<PackageReference Include="System.Reactive" Version="5.0.0" />
1213
<PackageReference Include="System.Text.Json" Version="6.0.6" />
1314
</ItemGroup>
1415

tests/CommunityToolkit.Mvvm.UnitTests/CommunityToolkit.Mvvm.UnitTests.projitems

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
<Compile Include="$(MSBuildThisFileDirectory)Test_AsyncRelayCommand{T}.cs" />
2626
<Compile Include="$(MSBuildThisFileDirectory)Test_INotifyPropertyChangedAttribute.cs" />
2727
<Compile Include="$(MSBuildThisFileDirectory)Test_IRecipientGenerator.cs" />
28+
<Compile Include="$(MSBuildThisFileDirectory)Test_Messenger.Observables.cs" />
2829
<Compile Include="$(MSBuildThisFileDirectory)Test_Messenger.cs" />
2930
<Compile Include="$(MSBuildThisFileDirectory)Test_Messenger.Request.cs" />
3031
<Compile Include="$(MSBuildThisFileDirectory)Test_ObservableObject.cs" />
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System;
6+
using System.Collections.Generic;
7+
using CommunityToolkit.Mvvm.Messaging;
8+
using Microsoft.VisualStudio.TestTools.UnitTesting;
9+
10+
namespace CommunityToolkit.Mvvm.UnitTests;
11+
12+
partial class Test_Messenger
13+
{
14+
[TestMethod]
15+
[DataRow(typeof(StrongReferenceMessenger))]
16+
[DataRow(typeof(WeakReferenceMessenger))]
17+
public void Test_Messenger_CreateObservable(Type type)
18+
{
19+
IMessenger messenger = (IMessenger)Activator.CreateInstance(type)!;
20+
21+
IObservable<MessageA> observable = messenger.CreateObservable<MessageA>();
22+
23+
Assert.IsNotNull(observable);
24+
25+
List<MessageA> messages = new();
26+
27+
IDisposable disposable = observable.Subscribe(messages.Add);
28+
29+
MessageA message1 = new();
30+
MessageA message2 = new();
31+
32+
_ = messenger.Send(message1);
33+
_ = messenger.Send(message2);
34+
35+
// The expected messages have been observed
36+
CollectionAssert.AreEqual(messages, new[] { message1, message2 });
37+
38+
disposable.Dispose();
39+
40+
_ = messenger.Send<MessageA>();
41+
42+
// No messages are sent after unsubscribing the observable
43+
CollectionAssert.AreEqual(messages, new[] { message1, message2 });
44+
}
45+
46+
[TestMethod]
47+
[DataRow(typeof(StrongReferenceMessenger))]
48+
[DataRow(typeof(WeakReferenceMessenger))]
49+
public void Test_Messenger_CreateObservable_WithToken(Type type)
50+
{
51+
IMessenger messenger = (IMessenger)Activator.CreateInstance(type)!;
52+
53+
IObservable<MessageA> observable = messenger.CreateObservable<MessageA, int>(42);
54+
55+
Assert.IsNotNull(observable);
56+
57+
List<MessageA> messages = new();
58+
59+
IDisposable disposable = observable.Subscribe(messages.Add);
60+
61+
MessageA message1 = new();
62+
MessageA message2 = new();
63+
64+
_ = messenger.Send(message1, 42);
65+
_ = messenger.Send(message2, 42);
66+
67+
_ = messenger.Send(new MessageA(), 1);
68+
_ = messenger.Send(new MessageA(), 999);
69+
70+
// The expected messages have been observed (only for matching tokens)
71+
CollectionAssert.AreEqual(messages, new[] { message1, message2 });
72+
73+
disposable.Dispose();
74+
75+
_ = messenger.Send(new MessageA(), 42);
76+
_ = messenger.Send(new MessageA(), 1);
77+
78+
// No messages are sent after unsubscribing the observable (regardless of token)
79+
CollectionAssert.AreEqual(messages, new[] { message1, message2 });
80+
}
81+
82+
[TestMethod]
83+
[ExpectedException(typeof(ArgumentNullException))]
84+
public void Test_Messenger_CreateObservable_NullMessenger()
85+
{
86+
_ = IMessengerExtensions.CreateObservable<MessageA>(null!);
87+
}
88+
89+
[TestMethod]
90+
[ExpectedException(typeof(ArgumentNullException))]
91+
public void Test_Messenger_CreateObservable_WithToken_NullMessenger()
92+
{
93+
_ = IMessengerExtensions.CreateObservable<MessageA, string>(null!, "Hello");
94+
}
95+
96+
[TestMethod]
97+
[ExpectedException(typeof(ArgumentNullException))]
98+
public void Test_Messenger_CreateObservable_WithToken_NullToken()
99+
{
100+
_ = IMessengerExtensions.CreateObservable<MessageA, string>(new WeakReferenceMessenger(), null!);
101+
}
102+
}

tests/CommunityToolkit.Mvvm.UnitTests/Test_Messenger.Request.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
namespace CommunityToolkit.Mvvm.UnitTests;
1414

15-
public partial class Test_Messenger
15+
partial class Test_Messenger
1616
{
1717
[TestMethod]
1818
[DataRow(typeof(StrongReferenceMessenger))]

0 commit comments

Comments
 (0)