Skip to content

Fix auto reconnection in query metadata #411

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 49 additions & 16 deletions RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

namespace RabbitMQ.Stream.Client.Reliable;

Expand Down Expand Up @@ -62,16 +64,32 @@ private async Task<IConsumer> StandardConsumer(bool boot)
if (IsClosedNormally(closeReason))
return;

await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream,
FromConnectionClosedReasonToStatusReason(closeReason)).ConfigureAwait(false);
try
{
await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream,
FromConnectionClosedReasonToStatusReason(closeReason)).ConfigureAwait(false);
}
catch (Exception e)
{
BaseLogger?.LogError(e,
$"Stream consumer.MetadataHandler error. Auto recovery failed for: {ToString()}");
}
},
MetadataHandler = async _ =>
{
if (IsClosedNormally())
return;

await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream,
ChangeStatusReason.MetaDataUpdate).ConfigureAwait(false);
try
{
await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream,
ChangeStatusReason.MetaDataUpdate).ConfigureAwait(false);
}
catch (Exception e)
{
BaseLogger?.LogError(e,
$"Stream consumer.MetadataHandler error. Auto recovery failed for: {ToString()}");
}
},
MessageHandler = async (consumer, ctx, message) =>
{
Expand Down Expand Up @@ -129,21 +147,36 @@ private async Task<IConsumer> SuperConsumer(bool boot)
await RandomWait().ConfigureAwait(false);
if (IsClosedNormally(closeReason))
return;

var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition;
await OnEntityClosed(_consumerConfig.StreamSystem, partitionStream, r,
FromConnectionClosedReasonToStatusReason(closeReason)).ConfigureAwait(false);
try
{
var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition;
await OnEntityClosed(_consumerConfig.StreamSystem, partitionStream, r,
FromConnectionClosedReasonToStatusReason(closeReason)).ConfigureAwait(false);
}
catch (Exception e)
{
BaseLogger?.LogError(e,
$"Super stream consumer. ConnectionClosedHandler error. Auto recovery failed for stream: {_consumerConfig.Stream}");
}
},
MetadataHandler = async update =>
{
await RandomWait().ConfigureAwait(false);
if (IsClosedNormally())
return;

var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition;
await OnEntityClosed(_consumerConfig.StreamSystem, update.Stream, r,
ChangeStatusReason.MetaDataUpdate)
.ConfigureAwait(false);
try
{
await RandomWait().ConfigureAwait(false);
if (IsClosedNormally())
return;

var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition;
await OnEntityClosed(_consumerConfig.StreamSystem, update.Stream, r,
ChangeStatusReason.MetaDataUpdate)
.ConfigureAwait(false);
}
catch (Exception e)
{
BaseLogger?.LogError(e,
$"Super stream consumer.MetadataHandler error. Auto recovery failed stream: {_consumerConfig.Stream}");
}
},
MessageHandler = async (partitionStream, consumer, ctx, message) =>
{
Expand Down
35 changes: 26 additions & 9 deletions RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

using System;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

namespace RabbitMQ.Stream.Client.Reliable;

Expand Down Expand Up @@ -49,21 +51,36 @@ private async Task<IProducer> SuperStreamProducer(bool boot)
await RandomWait().ConfigureAwait(false);
if (IsClosedNormally(closeReason))
return;
var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition;
await OnEntityClosed(_producerConfig.StreamSystem, partitionStream, r,
ReliableBase.FromConnectionClosedReasonToStatusReason(closeReason))
.ConfigureAwait(false);
try
{
var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition;
await OnEntityClosed(_producerConfig.StreamSystem, partitionStream, r,
FromConnectionClosedReasonToStatusReason(closeReason))
.ConfigureAwait(false);
}
catch (Exception e)
{
BaseLogger?.LogError(e,
$"Super stream producer. ConnectionClosedHandler error. Auto recovery failed for stream: {_producerConfig.Stream}");
}
},
MetadataHandler = async update =>
{
await RandomWait().ConfigureAwait(false);
if (IsClosedNormally())
return;

var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition;
await OnEntityClosed(_producerConfig.StreamSystem, update.Stream, r,
ChangeStatusReason.MetaDataUpdate)
.ConfigureAwait(false);
try
{
var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition;
await OnEntityClosed(_producerConfig.StreamSystem, update.Stream, r,
ChangeStatusReason.MetaDataUpdate)
.ConfigureAwait(false);
}
catch (Exception e)
{
BaseLogger?.LogError(e,
$"Super stream producer. MetadataHandler error. Auto recovery failed stream: {_producerConfig.Stream}");
}
},
ConfirmHandler = confirmationHandler =>
{
Expand Down
18 changes: 10 additions & 8 deletions RabbitMQ.Stream.Client/Reliable/ReliableBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,11 @@ private async Task Init(bool boot)
/// <param name="stream">stream name</param>
/// <param name="system">stream system</param>
/// <returns></returns>
private async Task<bool> CheckIfStreamIsAvailable(string stream, StreamSystem system)
private async Task<(bool, StreamInfo)> CheckIfStreamIsAvailable(string stream, StreamSystem system)
{
await Task.Delay(Consts.RandomMid()).ConfigureAwait(false);
var exists = false;
StreamInfo streamInfo = default;
var tryAgain = true;
while (tryAgain)
{
Expand All @@ -286,7 +287,7 @@ private async Task<bool> CheckIfStreamIsAvailable(string stream, StreamSystem sy
{
// It is not enough to check if the stream exists
// we need to check if the stream has the leader
var streamInfo = await system.StreamInfo(stream).ConfigureAwait(false);
streamInfo = await system.StreamInfo(stream).ConfigureAwait(false);
ClientExceptions.CheckLeader(streamInfo);
available += " and has a valid leader";
}
Expand All @@ -304,7 +305,7 @@ await _reliableConfig.ResourceAvailableReconnectStrategy
}

if (exists)
return true;
return (true, streamInfo);
// In this case the stream doesn't exist anymore or it failed to check if the stream exists
// too many tentatives for the reconnection strategy
// the Entity is just closed.
Expand All @@ -316,7 +317,7 @@ await _reliableConfig.ResourceAvailableReconnectStrategy
ToString()
);

return false;
return (false, default);
}

// <summary>
Expand Down Expand Up @@ -409,16 +410,17 @@ internal async Task OnEntityClosed(StreamSystem system, string stream,
Func<StreamInfo, Task> reconnectPartitionFunc, ChangeStatusReason reason)
{
var streamExists = false;

await SemaphoreSlim.WaitAsync().ConfigureAwait(false);
UpdateStatus(ReliableEntityStatus.Reconnection, reason,
[stream]);
try
{
streamExists = await CheckIfStreamIsAvailable(stream, system)
var (localStreamExists, streamInfo) = await CheckIfStreamIsAvailable(stream, system)
.ConfigureAwait(false);
if (streamExists)
streamExists = localStreamExists;
if (streamExists && !streamInfo.Equals(default))
{
var streamInfo = await system.StreamInfo(stream).ConfigureAwait(false);
await MaybeReconnectPartition(streamInfo, ToString(), reconnectPartitionFunc).ConfigureAwait(false);
}
}
Expand Down Expand Up @@ -446,7 +448,7 @@ internal async Task OnEntityClosed(StreamSystem system, string stream, ChangeSta
UpdateStatus(ReliableEntityStatus.Reconnection, reason, [stream]);
try
{
streamExists = await CheckIfStreamIsAvailable(stream, system)
(streamExists, _) = await CheckIfStreamIsAvailable(stream, system)
.ConfigureAwait(false);
if (streamExists)
{
Expand Down