Skip to content

Commit 9155462

Browse files
committed
updated json deserialization for messages
1 parent bb677fb commit 9155462

File tree

1 file changed

+62
-50
lines changed

1 file changed

+62
-50
lines changed

src/EventBusRabbitMq.cs

Lines changed: 62 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -41,25 +41,30 @@ public void Publish(IntegrationEvent @event)
4141

4242
_logger.Debug("Creating RabbitMQ channel to publish event: {EventId} ({EventName})", @event.Id, eventName);
4343

44-
using (var channel = _persistentConnection.CreateModel())
45-
{
46-
_logger.Debug("Declaring RabbitMQ exchange to publish event: {EventId}", @event.Id);
47-
48-
channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");
49-
50-
var message = JsonSerializer.Serialize(@event);
51-
var body = Encoding.UTF8.GetBytes(message);
44+
using var channel = _persistentConnection.CreateModel();
45+
_logger.Debug("Declaring RabbitMQ exchange to publish event: {EventId}", @event.Id);
5246

53-
policy.Execute(() =>
54-
{
55-
var properties = channel.CreateBasicProperties();
56-
properties.DeliveryMode = 2; // persistent
47+
channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");
5748

58-
_logger.Debug("Publishing event to RabbitMQ: {EventId}", @event.Id);
49+
var body = JsonSerializer.SerializeToUtf8Bytes(@event, @event.GetType(), new JsonSerializerOptions
50+
{
51+
WriteIndented = true
52+
});
5953

60-
channel.BasicPublish(exchange: BROKER_NAME, routingKey: eventName, mandatory: true, basicProperties: properties, body: body);
61-
});
62-
}
54+
policy.Execute(() =>
55+
{
56+
var properties = channel.CreateBasicProperties();
57+
properties.DeliveryMode = 2; // persistent
58+
59+
_logger.Debug("Publishing event to RabbitMQ: {EventId}", @event.Id);
60+
61+
channel.BasicPublish(
62+
exchange: BROKER_NAME,
63+
routingKey: eventName,
64+
mandatory: true,
65+
basicProperties: properties,
66+
body: body);
67+
});
6368
}
6469

6570
public void Publish(IEnumerable<IntegrationEvent> events)
@@ -94,12 +99,16 @@ public void Subscribe<T, TH>()
9499
private void DoInternalSubscription(string eventName)
95100
{
96101
var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);
97-
if (containsKey) return;
102+
103+
if (containsKey)
104+
return;
105+
98106
if (!_persistentConnection.IsConnected)
99107
_persistentConnection.TryConnect();
100108

101-
using var channel = _persistentConnection.CreateModel();
102-
channel.QueueBind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName);
109+
_consumerChannel.QueueBind(queue: _queueName,
110+
exchange: BROKER_NAME,
111+
routingKey: eventName);
103112
}
104113

105114
public void Unsubscribe<T, TH>()
@@ -123,13 +132,11 @@ private void SubsManager_OnEventRemoved(object sender, string eventName)
123132
if (!_persistentConnection.IsConnected)
124133
_persistentConnection.TryConnect();
125134

126-
using (var channel = _persistentConnection.CreateModel())
127-
{
128-
channel.QueueUnbind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName);
129-
if (!_subsManager.IsEmpty) return;
130-
_queueName = string.Empty;
131-
_consumerChannel.Close();
132-
}
135+
using var channel = _persistentConnection.CreateModel();
136+
channel.QueueUnbind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName);
137+
if (!_subsManager.IsEmpty) return;
138+
_queueName = string.Empty;
139+
_consumerChannel.Close();
133140
}
134141
private IModel CreateConsumerChannel()
135142
{
@@ -139,8 +146,11 @@ private IModel CreateConsumerChannel()
139146
_logger.Debug("Creating RabbitMQ consumer channel");
140147

141148
var channel = _persistentConnection.CreateModel();
149+
142150
channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");
151+
143152
channel.QueueDeclare(queue: _queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
153+
144154
channel.CallbackException += (sender, ea) =>
145155
{
146156
_logger.Warning(ea.Exception, "Recreating RabbitMQ consumer channel");
@@ -159,8 +169,13 @@ private void StartBasicConsume()
159169
if (_consumerChannel is not null)
160170
{
161171
var consumer = new AsyncEventingBasicConsumer(_consumerChannel);
172+
162173
consumer.Received += Consumer_Received;
163-
_consumerChannel.BasicConsume(queue: _queueName, autoAck: false, consumer: consumer);
174+
175+
_consumerChannel.BasicConsume(
176+
queue: _queueName,
177+
autoAck: false,
178+
consumer: consumer);
164179
}
165180
else
166181
{
@@ -197,31 +212,28 @@ private async Task ProcessEvent(string eventName, string message)
197212

198213
if (_subsManager.HasSubscriptionsForEvent(eventName))
199214
{
200-
using (var scope = _serviceProvider.CreateScope())
215+
using var scope = _serviceProvider.CreateScope();
216+
var subscriptions = _subsManager.GetHandlersForEvent(eventName);
217+
foreach (var subscription in subscriptions)
201218
{
202-
var subscriptions = _subsManager.GetHandlersForEvent(eventName);
203-
foreach (var subscription in subscriptions)
219+
if (subscription.IsDynamic)
220+
{
221+
var handler = scope.ServiceProvider.GetService(subscription.HandlerType) as IDynamicIntegrationEventHandler;
222+
if (handler == null) continue;
223+
using dynamic eventData = JsonDocument.Parse(message);
224+
await Task.Yield();
225+
await handler.Handle(eventData);
226+
}
227+
else
204228
{
205-
if (subscription.IsDynamic)
206-
{
207-
var handler = scope.ServiceProvider.GetService(subscription.HandlerType) as IDynamicIntegrationEventHandler;
208-
if (handler == null) continue;
209-
var eventData = JsonSerializer.Deserialize<object>(message);
210-
211-
await Task.Yield();
212-
await handler.Handle(eventData);
213-
}
214-
else
215-
{
216-
var handler = scope.ServiceProvider.GetService(subscription.HandlerType);
217-
if (handler == null) continue;
218-
var eventType = _subsManager.GetEventTypeByName(eventName);
219-
var integrationEvent = JsonSerializer.Deserialize(message, eventType);
220-
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
221-
222-
await Task.Yield();
223-
await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
224-
}
229+
var handler = scope.ServiceProvider.GetService(subscription.HandlerType);
230+
if (handler == null) continue;
231+
var eventType = _subsManager.GetEventTypeByName(eventName);
232+
var integrationEvent = JsonSerializer.Deserialize(message, eventType);
233+
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
234+
235+
await Task.Yield();
236+
await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
225237
}
226238
}
227239
}

0 commit comments

Comments
 (0)