Skip to content

Commit 2f4ac17

Browse files
Merge pull request #275 from Avanade/feat/newcartridges
fix: RabbitMqConsumer nullcheck.
2 parents b120340 + 6471171 commit 2f4ac17

File tree

3 files changed

+113
-10
lines changed

3 files changed

+113
-10
lines changed

src/Liquid.Messaging.RabbitMq/Liquid.Messaging.RabbitMq.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
<Copyright>Avanade 2019</Copyright>
1010
<PackageProjectUrl>https://github.com/Avanade/Liquid-Application-Framework</PackageProjectUrl>
1111
<PackageIcon>logo.png</PackageIcon>
12-
<Version>8.0.0</Version>
12+
<Version>8.0.1</Version>
1313
<GenerateDocumentationFile>true</GenerateDocumentationFile>
1414
<Description>
1515
The Liquid.Messaging.RabbitMq provides producer and consumer patterns to allow the send and consumption of Messaging inside your microservice.

src/Liquid.Messaging.RabbitMq/RabbitMqConsumer.cs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public class RabbitMqConsumer<TEntity> : ILiquidConsumer<TEntity>
2424
private readonly bool _autoAck;
2525
private IModel _channelModel;
2626
private readonly IRabbitMqFactory _factory;
27-
private readonly RabbitMqConsumerSettings _settings;
27+
private readonly IOptions<RabbitMqConsumerSettings> _settings;
2828

2929
///<inheritdoc/>
3030
public event Func<ConsumerMessageEventArgs<TEntity>, CancellationToken, Task> ConsumeMessageAsync;
@@ -40,9 +40,14 @@ public class RabbitMqConsumer<TEntity> : ILiquidConsumer<TEntity>
4040
public RabbitMqConsumer(IRabbitMqFactory factory, IOptions<RabbitMqConsumerSettings> settings)
4141
{
4242
_factory = factory ?? throw new ArgumentNullException(nameof(factory));
43-
_settings = settings?.Value ?? throw new ArgumentNullException(nameof(settings));
43+
_settings = settings ?? throw new ArgumentNullException(nameof(settings));
4444

45-
_autoAck = _settings.AdvancedSettings?.AutoAck ?? true;
45+
if (_settings.Value == null)
46+
{
47+
throw new ArgumentNullException(nameof(_settings.Value), "The settings value must be set.");
48+
}
49+
50+
_autoAck = _settings.Value.AdvancedSettings?.AutoAck ?? true;
4651
}
4752

4853
///<inheritdoc/>
@@ -53,14 +58,14 @@ public async Task RegisterMessageHandler(CancellationToken cancellationToken = d
5358
throw new NotImplementedException($"The {nameof(ConsumeMessageAsync)} action must be added to class.");
5459
}
5560

56-
_channelModel = _factory.GetReceiver(_settings);
61+
_channelModel = _factory.GetReceiver(_settings.Value);
5762

5863
var consumer = new EventingBasicConsumer(_channelModel);
5964

6065

6166
consumer.Received += async (model, deliverEvent) => { await MessageHandler(deliverEvent, cancellationToken); };
6267

63-
_channelModel.BasicConsume(_settings.Queue, _autoAck, consumer);
68+
_channelModel.BasicConsume(_settings.Value.Queue, _autoAck, consumer);
6469
}
6570

6671
/// <summary>
@@ -83,7 +88,7 @@ protected async Task MessageHandler(BasicDeliverEventArgs deliverEvent, Cancella
8388
{
8489
if (!_autoAck)
8590
{
86-
var queueAckMode = _settings.AdvancedSettings.QueueAckModeSettings ?? new QueueAckModeSettings() { QueueAckMode = QueueAckModeEnum.BasicAck, Requeue = true };
91+
var queueAckMode = _settings.Value.AdvancedSettings.QueueAckModeSettings ?? new QueueAckModeSettings() { QueueAckMode = QueueAckModeEnum.BasicAck, Requeue = true };
8792

8893
switch (queueAckMode.QueueAckMode)
8994
{

test/Liquid.Messaging.RabbitMq.Tests/RabbitMqConsumerTest.cs

Lines changed: 101 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,55 @@ namespace Liquid.Messaging.RabbitMq.Tests
1616
public class RabbitMqConsumerTest : RabbitMqConsumer<MessageMock>
1717
{
1818
public static readonly IRabbitMqFactory _factory = Substitute.For<IRabbitMqFactory>();
19-
public static readonly IOptions<RabbitMqConsumerSettings> _settings = GetOptions();
20-
19+
public static IOptions<RabbitMqConsumerSettings> _settings = GetOptions();
20+
public static RabbitMqConsumerSettings _settingsValue;
2121
public static IOptions<RabbitMqConsumerSettings> GetOptions()
2222
{
2323
var settings = Substitute.For<IOptions<RabbitMqConsumerSettings>>();
24-
settings.Value.Returns(new RabbitMqConsumerSettings());
24+
25+
_settingsValue = new RabbitMqConsumerSettings
26+
{
27+
CompressMessage = true,
28+
Exchange = "test",
29+
Queue = "test",
30+
AdvancedSettings = new AdvancedSettings
31+
{
32+
AutoAck = false,
33+
QueueAckModeSettings = new QueueAckModeSettings() { QueueAckMode = QueueAckModeEnum.BasicAck, Requeue = true }
34+
}
35+
};
36+
settings.Value.Returns(_settingsValue);
2537
return settings;
2638
}
2739

2840
public RabbitMqConsumerTest()
2941
: base(_factory, _settings)
3042
{
43+
var model = Substitute.For<IModel>();
44+
_factory.GetReceiver(Arg.Any<RabbitMqConsumerSettings>()).Returns(model);
45+
}
46+
47+
[Fact]
48+
public void Constructor_WhenFactoryIsNull_ThrowArgumentNullException()
49+
{
50+
Assert.Throws<ArgumentNullException>(() => new RabbitMqConsumer<MessageMock>(null, _settings));
3151
}
3252

53+
[Fact]
54+
public void Constructor_WhenSettingsIsNull_ThrowArgumentNullException()
55+
{
56+
Assert.Throws<ArgumentNullException>(() => new RabbitMqConsumer<MessageMock>(_factory, null));
57+
}
58+
59+
[Fact]
60+
public void Constructor_WhenSettingsValueIsNull_ThrowArgumentNullException()
61+
{
62+
var settings = Substitute.For<IOptions<RabbitMqConsumerSettings>>();
63+
settings.Value.Returns((RabbitMqConsumerSettings)null);
64+
Assert.Throws<ArgumentNullException>(() => new RabbitMqConsumer<MessageMock>(_factory, settings));
65+
}
66+
67+
3368
[Fact]
3469
public void RegisterMessageHandler_WhenRegisteredSucessfully_BasicConsumeReceivedCall()
3570
{
@@ -66,10 +101,73 @@ public async Task MessageHandler_WhenProcessExecutedSucessfully()
66101
_factory.GetReceiver(Arg.Any<RabbitMqConsumerSettings>()).Returns(messageReceiver);
67102

68103
ConsumeMessageAsync += ProcessMessageAsyncMock;
104+
await RegisterMessageHandler();
105+
await MessageHandler(message, new CancellationToken());
106+
}
107+
108+
[Fact]
109+
public async Task MessageHandler_CallsConsumeMessageAsync_AndAcks_WhenAutoAckIsFalse()
110+
{
111+
var message = new BasicDeliverEventArgs();
112+
var entity = new MessageMock() { TestMessageId = 1 };
113+
message.Body = entity.ToJsonBytes();
114+
var messageReceiver = Substitute.For<IModel>();
115+
ConsumeMessageAsync += ProcessMessageAsyncMock;
116+
_factory.GetReceiver(Arg.Any<RabbitMqConsumerSettings>()).Returns(messageReceiver);
117+
118+
await RegisterMessageHandler();
119+
120+
await MessageHandler(message, new CancellationToken());
121+
messageReceiver.Received(1).BasicAck(message.DeliveryTag, false);
69122

123+
}
124+
125+
[Fact]
126+
public async Task MessageHandler_CallsConsumeMessageAsync_AndNacks_WhenAutoAckIsFalse()
127+
{
128+
var message = new BasicDeliverEventArgs();
129+
var entity = new MessageMock() { TestMessageId = 2 };
130+
message.Body = entity.ToJsonBytes();
131+
var messageReceiver = Substitute.For<IModel>();
132+
ConsumeMessageAsync += ProcessMessageAsyncMock;
133+
_factory.GetReceiver(Arg.Any<RabbitMqConsumerSettings>()).Returns(messageReceiver);
134+
135+
await RegisterMessageHandler();
70136
await MessageHandler(message, new CancellationToken());
137+
messageReceiver.Received(1).BasicNack(message.DeliveryTag, false, true);
71138
}
72139

140+
[Fact]
141+
public async Task MessageHandler_CallsConsumeMessageAsync_AndRejects_WhenAutoAckIsFalse()
142+
{
143+
var message = new BasicDeliverEventArgs();
144+
var entity = new MessageMock() { TestMessageId = 2 };
145+
message.Body = entity.ToJsonBytes();
146+
var messageReceiver = Substitute.For<IModel>();
147+
ConsumeMessageAsync += ProcessMessageAsyncMock;
148+
149+
_factory.GetReceiver(Arg.Any<RabbitMqConsumerSettings>()).Returns(messageReceiver);
150+
151+
_settingsValue = new RabbitMqConsumerSettings
152+
{
153+
CompressMessage = true,
154+
Exchange = "test",
155+
Queue = "test",
156+
AdvancedSettings = new AdvancedSettings
157+
{
158+
AutoAck = false,
159+
QueueAckModeSettings = new QueueAckModeSettings() { QueueAckMode = QueueAckModeEnum.BasicReject, Requeue = true }
160+
}
161+
};
162+
163+
_settings.Value.Returns(_settingsValue);
164+
165+
await RegisterMessageHandler();
166+
167+
await MessageHandler(message, new CancellationToken());
168+
messageReceiver.Received(1).BasicReject(message.DeliveryTag, true);
169+
}
170+
73171
#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
74172
private async Task ProcessMessageAsyncMock(ConsumerMessageEventArgs<MessageMock> args, CancellationToken cancellationToken)
75173
#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously

0 commit comments

Comments
 (0)