Skip to content

Commit cfda0eb

Browse files
committed
Improved error handling around message queue initialization
1 parent f64c89a commit cfda0eb

File tree

6 files changed

+69
-13
lines changed

6 files changed

+69
-13
lines changed

Source/Platibus.MongoDB/MongoDBEventType.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,11 @@ public class MongoDBEventType
1616
/// Thrown when an index creation operation fails
1717
/// </summary>
1818
public static readonly DiagnosticEventType IndexCreationFailed = new DiagnosticEventType("IndexCreationFailed", DiagnosticEventLevel.Warn);
19+
20+
/// <summary>
21+
/// Emitted when message headers or content cannot be read from the database
22+
/// </summary>
23+
public static readonly DiagnosticEventType MessageDocumentFormatError = new DiagnosticEventType("MessageDocumentFormatError", DiagnosticEventLevel.Error);
24+
1925
}
2026
}

Source/Platibus.MongoDB/MongoDBMessageQueue.cs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ private static string MapToCollectionName(QueueName queueName)
200200
return _queuedMessages.UpdateOneAsync(filter, update, cancellationToken: cancellationToken);
201201
}
202202

203+
/// <inheritdoc />
203204
/// <summary>
204205
/// Returns messages in the queue that are pending
205206
/// </summary>
@@ -216,10 +217,23 @@ private static string MapToCollectionName(QueueName queueName)
216217
var queuedMessages = new List<QueuedMessage>();
217218
foreach (var queuedMessage in existingMessages)
218219
{
219-
var messageHeaders = new MessageHeaders(queuedMessage.Headers);
220-
var principal = await _securityTokenService.NullSafeValidate(messageHeaders.SecurityToken);
221-
var message = new Message(messageHeaders, queuedMessage.Content).WithoutSecurityToken();
222-
queuedMessages.Add(new QueuedMessage(message, principal, queuedMessage.Attempts));
220+
try
221+
{
222+
var messageHeaders = new MessageHeaders(queuedMessage.Headers);
223+
var principal = await _securityTokenService.NullSafeValidate(messageHeaders.SecurityToken);
224+
var message = new Message(messageHeaders, queuedMessage.Content).WithoutSecurityToken();
225+
queuedMessages.Add(new QueuedMessage(message, principal, queuedMessage.Attempts));
226+
}
227+
catch (Exception ex)
228+
{
229+
DiagnosticService.Emit(new MongoDBEventBuilder(this, MongoDBEventType.MessageDocumentFormatError)
230+
{
231+
Detail = "Error reading previously queued message document ID " + queuedMessage.Id + "; skipping",
232+
CollectionName = _queuedMessages.CollectionNamespace.CollectionName,
233+
DatabaseName = _queuedMessages.Database.DatabaseNamespace.DatabaseName,
234+
Exception = ex
235+
}.Build());
236+
}
223237
}
224238
return queuedMessages;
225239
}

Source/Platibus/Diagnostics/DiagnosticEventType.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,12 @@ public class DiagnosticEventType
3636
/// </summary>
3737
public static readonly DiagnosticEventType ComponentInitialization = new DiagnosticEventType("ComponentInitialization", DiagnosticEventLevel.Info);
3838

39+
/// <summary>
40+
/// Emitted when an error occurs during component or service initialization
41+
/// </summary>
42+
public static readonly DiagnosticEventType ComponentInitializationError = new DiagnosticEventType("ComponentInitializationError", DiagnosticEventLevel.Error);
43+
44+
3945
/// <summary>
4046
/// Emitted when a default configuration is used
4147
/// </summary>

Source/Platibus/Queueing/AbstractMessageQueue.cs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
namespace Platibus.Queueing
3232
{
33+
/// <inheritdoc />
3334
/// <summary>
3435
/// An abstract base class for implementing message queues
3536
/// </summary>
@@ -123,7 +124,19 @@ protected AbstractMessageQueue(QueueName queueName, IQueueListener listener, Que
123124
{
124125
if (Interlocked.Exchange(ref _initialized, 1) == 0)
125126
{
126-
await EnqueueExistingMessages(cancellationToken);
127+
try
128+
{
129+
await EnqueueExistingMessages(cancellationToken);
130+
}
131+
catch (Exception ex)
132+
{
133+
DiagnosticService.Emit(
134+
new DiagnosticEventBuilder(this, DiagnosticEventType.ComponentInitializationError)
135+
{
136+
Detail = "Error enqueueing previously queued message(s)",
137+
Exception = ex
138+
}.Build());
139+
}
127140
}
128141
}
129142

Source/Platibus/SQL/SQLEventType.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,5 +31,11 @@ public static class SQLEventType
3131
/// Emitted when there is an error opening or closing a connection
3232
/// </summary>
3333
public static readonly DiagnosticEventType CommandError = new DiagnosticEventType("CommandError", DiagnosticEventLevel.Error);
34+
35+
/// <summary>
36+
/// Emitted when message headers or content cannot be read from the database
37+
/// </summary>
38+
public static readonly DiagnosticEventType MessageRecordFormatError = new DiagnosticEventType("MessageFormatError", DiagnosticEventLevel.Error);
39+
3440
}
3541
}

Source/Platibus/SQL/SQLMessageQueue.cs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -148,16 +148,27 @@ await DiagnosticService.EmitAsync(
148148
{
149149
while (await reader.ReadAsync(cancellationToken))
150150
{
151-
var record = commandBuilder.BuildQueuedMessageRecord(reader);
152-
var messageContent = record.Content;
153-
var headers = DeserializeHeaders(record.Headers);
151+
try
152+
{
153+
var record = commandBuilder.BuildQueuedMessageRecord(reader);
154+
var messageContent = record.Content;
155+
var headers = DeserializeHeaders(record.Headers);
154156
#pragma warning disable 612
155-
var principal = await ResolvePrincipal(headers, record.SenderPrincipal);
157+
var principal = await ResolvePrincipal(headers, record.SenderPrincipal);
156158
#pragma warning restore 612
157-
var message = new Message(headers, messageContent).WithoutSecurityToken();
158-
var attempts = record.Attempts;
159-
var queuedMessage = new QueuedMessage(message, principal, attempts);
160-
queuedMessages.Add(queuedMessage);
159+
var message = new Message(headers, messageContent).WithoutSecurityToken();
160+
var attempts = record.Attempts;
161+
var queuedMessage = new QueuedMessage(message, principal, attempts);
162+
queuedMessages.Add(queuedMessage);
163+
}
164+
catch (Exception ex)
165+
{
166+
DiagnosticService.Emit(new SQLEventBuilder(this, SQLEventType.MessageRecordFormatError)
167+
{
168+
Detail = "Error reading previously queued message record; skipping",
169+
Exception = ex
170+
}.Build());
171+
}
161172
}
162173
}
163174
}

0 commit comments

Comments
 (0)