Skip to content

Add RCP timeout #385

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions RabbitMQ.Stream.Client/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ public record ClientParameters
public IDictionary<string, string> Properties { get; } =
new Dictionary<string, string>
{
{"product", "RabbitMQ Stream"},
{"version", Version.VersionString},
{"platform", ".NET"},
{ "product", "RabbitMQ Stream" },
{ "version", Version.VersionString },
{ "platform", ".NET" },
{
"copyright",
"Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries."
Expand All @@ -48,7 +48,7 @@ public record ClientParameters
"information",
"Licensed under the Apache 2.0 and MPL 2.0 licenses. See https://www.rabbitmq.com/"
},
{"connection_name", "Unknown"}
{ "connection_name", "Unknown" }
};

public string UserName { get; set; } = "guest";
Expand Down Expand Up @@ -77,6 +77,8 @@ public string ClientProvidedName

public AuthMechanism AuthMechanism { get; set; } = AuthMechanism.Plain;

public TimeSpan RpcTimeOut { get; set; } = TimeSpan.FromSeconds(10);

internal void FireMetadataUpdate(MetaDataUpdate metaDataUpdate)
{
OnMetadataUpdate?.Invoke(metaDataUpdate);
Expand Down Expand Up @@ -113,8 +115,6 @@ public class Client : IClient
{
private bool isClosed = true;

private readonly TimeSpan defaultTimeout = TimeSpan.FromSeconds(10);

private uint correlationId = 0; // allow for some pre-amble

private Connection _connection;
Expand Down Expand Up @@ -150,7 +150,6 @@ public class Client : IClient

public int IncomingFrames => _connection.NumFrames;

//public int IncomingChannelCount => this.incoming.Reader.Count;
private static readonly object Obj = new();

private readonly ILogger _logger;
Expand Down Expand Up @@ -494,7 +493,7 @@ private async ValueTask<TOut> Request<TIn, TOut>(Func<uint, TIn> request, TimeSp
var tcs = PooledTaskSource<TOut>.Rent();
requests.TryAdd(corr, tcs);
await Publish(request(corr)).ConfigureAwait(false);
using var cts = new CancellationTokenSource(timeout ?? defaultTimeout);
using var cts = new CancellationTokenSource(timeout ?? Parameters.RpcTimeOut);
await using (cts.Token.Register(
valueTaskSource =>
((ManualResetValueTaskSource<TOut>)valueTaskSource).SetException(
Expand Down Expand Up @@ -973,7 +972,16 @@ public bool RunContinuationsAsynchronously
public short Version => _logic.Version;
public void Reset() => _logic.Reset();
public void SetResult(T result) => _logic.SetResult(result);
public void SetException(Exception error) => _logic.SetException(error);

public void SetException(Exception error)
{
// https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/issues/384
// we need to check if the task is pending before setting the exception
if (_logic.GetStatus(_logic.Version) == ValueTaskSourceStatus.Pending)
{
_logic.SetException(error);
}
}

void IValueTaskSource.GetResult(short token) => _logic.GetResult(token);
T IValueTaskSource<T>.GetResult(short token) => _logic.GetResult(token);
Expand Down
4 changes: 4 additions & 0 deletions RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.get -> RabbitMQ.Stream.Cli
RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.set -> void
RabbitMQ.Stream.Client.ClientParameters.MetadataUpdateHandler
RabbitMQ.Stream.Client.ClientParameters.OnMetadataUpdate -> RabbitMQ.Stream.Client.ClientParameters.MetadataUpdateHandler
RabbitMQ.Stream.Client.ClientParameters.RpcTimeOut.get -> System.TimeSpan
RabbitMQ.Stream.Client.ClientParameters.RpcTimeOut.set -> void
RabbitMQ.Stream.Client.Connection.UpdateCloseStatus(string reason) -> void
RabbitMQ.Stream.Client.ConnectionItem
RabbitMQ.Stream.Client.ConnectionItem.Available.get -> bool
Expand Down Expand Up @@ -324,6 +326,8 @@ RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.get -> RabbitMQ.Stream.C
RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.set -> void
RabbitMQ.Stream.Client.StreamSystemConfig.ConnectionPoolConfig.get -> RabbitMQ.Stream.Client.ConnectionPoolConfig
RabbitMQ.Stream.Client.StreamSystemConfig.ConnectionPoolConfig.set -> void
RabbitMQ.Stream.Client.StreamSystemConfig.RpcTimeOut.get -> System.TimeSpan
RabbitMQ.Stream.Client.StreamSystemConfig.RpcTimeOut.set -> void
RabbitMQ.Stream.Client.SuperStreamSpec
RabbitMQ.Stream.Client.SuperStreamSpec.Args.get -> System.Collections.Generic.IDictionary<string, string>
RabbitMQ.Stream.Client.SuperStreamSpec.LeaderLocator.set -> void
Expand Down
15 changes: 14 additions & 1 deletion RabbitMQ.Stream.Client/StreamSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ internal void Validate()
{
throw new ArgumentException("ConnectionPoolConfig can't be null");
}

if (RpcTimeOut < TimeSpan.FromSeconds(1))
{
throw new ArgumentException("RpcTimeOut must be at least 1 second");
}
}

public string UserName { get; set; } = "guest";
Expand All @@ -44,6 +49,13 @@ internal void Validate()
/// Configure the connection pool for producers and consumers.
/// </summary>
public ConnectionPoolConfig ConnectionPoolConfig { get; set; } = new();

/// <summary>
/// The timeout for RPC calls, like PeerProperties, QueryMetadata, etc.
/// Default value is 10 seconds and in most cases it should be enough.
/// Low value can cause false errors in the client.
/// </summary>
public TimeSpan RpcTimeOut { get; set; } = TimeSpan.FromSeconds(10);
}

public class StreamSystem
Expand Down Expand Up @@ -85,7 +97,8 @@ public static async Task<StreamSystem> Create(StreamSystemConfig config, ILogger
ClientProvidedName = config.ClientProvidedName,
Heartbeat = config.Heartbeat,
Endpoints = config.Endpoints,
AuthMechanism = config.AuthMechanism
AuthMechanism = config.AuthMechanism,
RpcTimeOut = config.RpcTimeOut
};
// create the metadata client connection
foreach (var endPoint in clientParams.Endpoints)
Expand Down
9 changes: 9 additions & 0 deletions Tests/SystemTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,15 @@ await Assert.ThrowsAsync<AuthMechanismNotSupportedException>(
);
}

[Fact]
public async void ValidateRpCtimeOut()
{
var config = new StreamSystemConfig() { RpcTimeOut = TimeSpan.FromMilliseconds(1) };
await Assert.ThrowsAsync<ArgumentException>(
async () => { await StreamSystem.Create(config); }
);
}

[Fact]
public async void CloseProducerConsumerAfterForceCloseShouldNotRaiseError()
{
Expand Down
2 changes: 1 addition & 1 deletion kubernetes/stream_cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ metadata:
namespace: stream-clients-test
spec:
replicas: 3
image: rabbitmq:3.13-rc-management
image: rabbitmq:3.13-management
service:
type: LoadBalancer
# tls:
Expand Down