-
Notifications
You must be signed in to change notification settings - Fork 1.3k
CSHARP-3537: CSOT: retryable reads and writes #1717
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
a76715e
3e5a4cd
6cf58a2
dc2d1ea
fefc460
e6b11d9
d936f03
cbb79b8
25ced32
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,9 +14,11 @@ | |
*/ | ||
|
||
using System; | ||
using System.Collections.Generic; | ||
using System.Threading.Tasks; | ||
using MongoDB.Driver.Core.Bindings; | ||
using MongoDB.Driver.Core.Misc; | ||
using MongoDB.Driver.Core.Servers; | ||
|
||
namespace MongoDB.Driver.Core.Operations | ||
{ | ||
|
@@ -29,41 +31,33 @@ public static RetryableReadContext Create(OperationContext operationContext, IRe | |
var context = new RetryableReadContext(binding, retryRequested); | ||
try | ||
{ | ||
context.Initialize(operationContext); | ||
|
||
ChannelPinningHelper.PinChannellIfRequired( | ||
context.ChannelSource, | ||
context.Channel, | ||
context.Binding.Session); | ||
|
||
return context; | ||
context.Initialize(operationContext, null); | ||
} | ||
catch | ||
{ | ||
context.Dispose(); | ||
throw; | ||
} | ||
|
||
ChannelPinningHelper.PinChannellIfRequired(context.ChannelSource, context.Channel, context.Binding.Session); | ||
return context; | ||
} | ||
|
||
public static async Task<RetryableReadContext> CreateAsync(OperationContext operationContext, IReadBinding binding, bool retryRequested) | ||
{ | ||
var context = new RetryableReadContext(binding, retryRequested); | ||
try | ||
{ | ||
await context.InitializeAsync(operationContext).ConfigureAwait(false); | ||
|
||
ChannelPinningHelper.PinChannellIfRequired( | ||
context.ChannelSource, | ||
context.Channel, | ||
context.Binding.Session); | ||
|
||
return context; | ||
await context.InitializeAsync(operationContext, null).ConfigureAwait(false); | ||
} | ||
catch | ||
{ | ||
context.Dispose(); | ||
throw; | ||
} | ||
|
||
ChannelPinningHelper.PinChannellIfRequired(context.ChannelSource, context.Channel, context.Binding.Session); | ||
return context; | ||
} | ||
#endregion | ||
|
||
|
@@ -96,50 +90,72 @@ public void Dispose() | |
} | ||
} | ||
|
||
public void ReplaceChannel(IChannelHandle channel) | ||
internal void Initialize(OperationContext operationContext, IReadOnlyCollection<ServerDescription> deprioritizedServers) | ||
{ | ||
var attempt = 1; | ||
while (true) | ||
{ | ||
try | ||
{ | ||
ReplaceChannelSource(Binding.GetReadChannelSource(operationContext, deprioritizedServers)); | ||
ReplaceChannel(ChannelSource.GetChannel(operationContext)); | ||
return; | ||
} | ||
catch (Exception ex) | ||
{ | ||
var innerException = ex is MongoAuthenticationException mongoAuthenticationException ? mongoAuthenticationException.InnerException : ex; | ||
if (RetryableReadOperationExecutor.ShouldRetryOperation(operationContext, this, innerException, attempt)) | ||
{ | ||
attempt++; | ||
} | ||
else | ||
{ | ||
throw; | ||
} | ||
} | ||
} | ||
} | ||
|
||
internal async Task InitializeAsync(OperationContext operationContext, IReadOnlyCollection<ServerDescription> deprioritizedServers) | ||
{ | ||
var attempt = 1; | ||
while (true) | ||
{ | ||
try | ||
{ | ||
ReplaceChannelSource(await Binding.GetReadChannelSourceAsync(operationContext, deprioritizedServers).ConfigureAwait(false)); | ||
ReplaceChannel(await ChannelSource.GetChannelAsync(operationContext).ConfigureAwait(false)); | ||
return; | ||
} | ||
catch (Exception ex) | ||
{ | ||
var innerException = ex is MongoAuthenticationException mongoAuthenticationException ? mongoAuthenticationException.InnerException : ex; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not to continue extracting the exception in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
if (RetryableReadOperationExecutor.ShouldRetryOperation(operationContext, this, innerException, attempt)) | ||
{ | ||
attempt++; | ||
} | ||
else | ||
{ | ||
throw; | ||
} | ||
} | ||
} | ||
} | ||
|
||
private void ReplaceChannel(IChannelHandle channel) | ||
{ | ||
Ensure.IsNotNull(channel, nameof(channel)); | ||
_channel?.Dispose(); | ||
_channel = channel; | ||
} | ||
|
||
public void ReplaceChannelSource(IChannelSourceHandle channelSource) | ||
private void ReplaceChannelSource(IChannelSourceHandle channelSource) | ||
{ | ||
Ensure.IsNotNull(channelSource, nameof(channelSource)); | ||
_channelSource?.Dispose(); | ||
_channel?.Dispose(); | ||
_channelSource = channelSource; | ||
_channel = null; | ||
} | ||
|
||
private void Initialize(OperationContext operationContext) | ||
{ | ||
_channelSource = _binding.GetReadChannelSource(operationContext); | ||
|
||
try | ||
{ | ||
_channel = _channelSource.GetChannel(operationContext); | ||
} | ||
catch (Exception ex) when (RetryableReadOperationExecutor.ShouldConnectionAcquireBeRetried(this, ex)) | ||
{ | ||
ReplaceChannelSource(_binding.GetReadChannelSource(operationContext)); | ||
ReplaceChannel(_channelSource.GetChannel(operationContext)); | ||
} | ||
} | ||
|
||
private async Task InitializeAsync(OperationContext operationContext) | ||
{ | ||
_channelSource = await _binding.GetReadChannelSourceAsync(operationContext).ConfigureAwait(false); | ||
|
||
try | ||
{ | ||
_channel = await _channelSource.GetChannelAsync(operationContext).ConfigureAwait(false); | ||
} | ||
catch (Exception ex) when (RetryableReadOperationExecutor.ShouldConnectionAcquireBeRetried(this, ex)) | ||
{ | ||
ReplaceChannelSource(await _binding.GetReadChannelSourceAsync(operationContext).ConfigureAwait(false)); | ||
ReplaceChannel(await _channelSource.GetChannelAsync(operationContext).ConfigureAwait(false)); | ||
} | ||
} | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.