diff --git a/sandbox/MicroBenchmark/JSPublishBench.cs b/sandbox/MicroBenchmark/JSPublishBench.cs
index 2bbbc73b9..4c9c5d173 100644
--- a/sandbox/MicroBenchmark/JSPublishBench.cs
+++ b/sandbox/MicroBenchmark/JSPublishBench.cs
@@ -15,13 +15,16 @@ public class JSPublishBench
private NatsConnection _nats;
private NatsJSContext _js;
+ [Params(NatsRequestReplyMode.Direct, NatsRequestReplyMode.SharedInbox)]
+ public NatsRequestReplyMode Mode { get; set; }
+
[Params(1, 10, 1_000)]
public int Batch { get; set; }
[GlobalSetup]
public async Task Setup()
{
- _nats = new NatsConnection();
+ _nats = new NatsConnection(new NatsOpts { RequestReplyMode = Mode });
_js = new NatsJSContext(_nats);
await _nats.ConnectAsync();
await _js.CreateStreamAsync(new StreamConfig("bench_test1", ["bench_test1"]));
diff --git a/sandbox/MicroBenchmark/KVBench.cs b/sandbox/MicroBenchmark/KVBench.cs
index 60b981988..60d5a12b3 100644
--- a/sandbox/MicroBenchmark/KVBench.cs
+++ b/sandbox/MicroBenchmark/KVBench.cs
@@ -16,10 +16,13 @@ public class KvBench
private NatsKVContext _kv;
private NatsKVStore _store;
+ [Params(NatsRequestReplyMode.Direct, NatsRequestReplyMode.SharedInbox)]
+ public NatsRequestReplyMode Mode { get; set; }
+
[GlobalSetup]
public async Task SetupAsync()
{
- _nats = new NatsConnection();
+ _nats = new NatsConnection(new NatsOpts { RequestReplyMode = Mode });
_js = new NatsJSContext(_nats);
_kv = new NatsKVContext(_js);
_store = (NatsKVStore)(await _kv.CreateStoreAsync("benchmark"));
diff --git a/src/NATS.Client.Core/Internal/NatsInstrumentationContext.cs b/src/NATS.Client.Core/Internal/NatsInstrumentationContext.cs
new file mode 100644
index 000000000..20de553b2
--- /dev/null
+++ b/src/NATS.Client.Core/Internal/NatsInstrumentationContext.cs
@@ -0,0 +1,13 @@
+using System.Diagnostics;
+
+namespace NATS.Client.Core.Internal;
+
+public readonly record struct NatsInstrumentationContext(
+ string Subject,
+ NatsHeaders? Headers,
+ string? ReplyTo,
+ string? QueueGroup,
+ long? BodySize,
+ long? Size,
+ INatsConnection? Connection,
+ ActivityContext? ParentContext);
diff --git a/src/NATS.Client.Core/Internal/NatsInstrumentationOptions.cs b/src/NATS.Client.Core/Internal/NatsInstrumentationOptions.cs
new file mode 100644
index 000000000..af4e1aa0d
--- /dev/null
+++ b/src/NATS.Client.Core/Internal/NatsInstrumentationOptions.cs
@@ -0,0 +1,26 @@
+using System.Diagnostics;
+
+namespace NATS.Client.Core.Internal;
+
+///
+/// Options for the OpenTelemetry instrumentation.
+///
+public sealed class NatsInstrumentationOptions
+{
+ public static NatsInstrumentationOptions Default => new();
+
+ ///
+ /// Gets or sets a filter function that determines whether or not to collect telemetry on a per request basis.
+ ///
+ ///
+ /// The return value for the filter function is interpreted as follows:
+ /// - If filter returns `true`, the request is collected.
+ /// - If filter returns `false` or throws an exception the request is NOT collected.
+ ///
+ public Func? Filter { get; set; }
+
+ ///
+ /// Gets or sets an action to enrich an Activity.
+ ///
+ public Action? Enrich { get; set; }
+}
diff --git a/src/NATS.Client.Core/Internal/Telemetry.cs b/src/NATS.Client.Core/Internal/Telemetry.cs
index fe781f5a4..c955af7a3 100644
--- a/src/NATS.Client.Core/Internal/Telemetry.cs
+++ b/src/NATS.Client.Core/Internal/Telemetry.cs
@@ -6,14 +6,13 @@ namespace NATS.Client.Core.Internal;
// https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#messaging-attributes
internal static class Telemetry
{
- internal static readonly ActivitySource NatsActivities = new(name: NatsActivitySource);
-
- private const string NatsActivitySource = "NATS.Net";
+ public const string NatsActivitySource = "NATS.Net";
+ public static readonly ActivitySource NatsActivities = new(name: NatsActivitySource);
private static readonly object BoxedTrue = true;
- internal static bool HasListeners() => NatsActivities.HasListeners();
+ public static bool HasListeners() => NatsActivities.HasListeners();
- internal static Activity? StartSendActivity(
+ public static Activity? StartSendActivity(
string name,
INatsConnection? connection,
string subject,
@@ -23,6 +22,19 @@ internal static class Telemetry
if (!NatsActivities.HasListeners())
return null;
+ var instrumentationContext = new NatsInstrumentationContext(
+ Subject: subject,
+ Headers: null,
+ ReplyTo: replyTo,
+ QueueGroup: null,
+ BodySize: null,
+ Size: null,
+ Connection: connection,
+ ParentContext: parentContext);
+
+ if (NatsInstrumentationOptions.Default.Filter is { } filter && !filter(instrumentationContext))
+ return null;
+
KeyValuePair[] tags;
if (connection is NatsConnection { ServerInfo: not null } conn)
{
@@ -63,14 +75,19 @@ internal static class Telemetry
tags[3] = new KeyValuePair(Constants.ReplyTo, replyTo);
}
- return NatsActivities.StartActivity(
+ var activity = NatsActivities.StartActivity(
name,
kind: ActivityKind.Producer,
parentContext: parentContext ?? default,
tags: tags);
+
+ if (activity is not null)
+ NatsInstrumentationOptions.Default.Enrich?.Invoke(activity, instrumentationContext);
+
+ return activity;
}
- internal static void AddTraceContextHeaders(Activity? activity, ref NatsHeaders? headers)
+ public static void AddTraceContextHeaders(Activity? activity, ref NatsHeaders? headers)
{
if (activity is null)
return;
@@ -87,7 +104,7 @@ internal static void AddTraceContextHeaders(Activity? activity, ref NatsHeaders?
return;
}
- // There are cases where headers reused internally (e.g. JetStream publish retry)
+ // There are cases where headers reused publicly (e.g. JetStream publish retry)
// there may also be cases where application can reuse the same header
// in which case we should still be able to overwrite headers with telemetry fields
// even though headers would be set to readonly before being passed down in publish methods.
@@ -95,7 +112,7 @@ internal static void AddTraceContextHeaders(Activity? activity, ref NatsHeaders?
});
}
- internal static Activity? StartReceiveActivity(
+ public static Activity? StartReceiveActivity(
INatsConnection? connection,
string name,
string subscriptionSubject,
@@ -109,6 +126,22 @@ internal static void AddTraceContextHeaders(Activity? activity, ref NatsHeaders?
if (!NatsActivities.HasListeners())
return null;
+ if (headers is null || !TryParseTraceContext(headers, out var context))
+ context = default;
+
+ var instrumentationContext = new NatsInstrumentationContext(
+ Subject: subject,
+ Headers: headers,
+ ReplyTo: replyTo,
+ QueueGroup: queueGroup,
+ BodySize: bodySize,
+ Size: size,
+ Connection: connection,
+ ParentContext: context);
+
+ if (NatsInstrumentationOptions.Default.Filter is { } filter && !filter(instrumentationContext))
+ return null;
+
KeyValuePair[] tags;
if (connection is NatsConnection { ServerInfo: not null } conn)
{
@@ -162,17 +195,19 @@ internal static void AddTraceContextHeaders(Activity? activity, ref NatsHeaders?
tags[9] = new KeyValuePair(Constants.ReplyTo, replyTo);
}
- if (headers is null || !TryParseTraceContext(headers, out var context))
- context = default;
-
- return NatsActivities.StartActivity(
+ var activity = NatsActivities.StartActivity(
name,
kind: ActivityKind.Consumer,
parentContext: context,
tags: tags);
+
+ if (activity is not null)
+ NatsInstrumentationOptions.Default.Enrich?.Invoke(activity, instrumentationContext);
+
+ return activity;
}
- internal static void SetException(Activity? activity, Exception exception)
+ public static void SetException(Activity? activity, Exception exception)
{
if (activity is null)
return;
@@ -251,11 +286,14 @@ private static bool TryParseTraceContext(NatsHeaders headers, out ActivityContex
},
out var traceParent,
out var traceState);
-
+#if NET6_0
return ActivityContext.TryParse(traceParent, traceState, out context);
+#else
+ return ActivityContext.TryParse(traceParent, traceState, isRemote: true, out context);
+#endif
}
- internal class Constants
+ public class Constants
{
public const string True = "true";
public const string False = "false";
diff --git a/src/NATS.Client.Core/NATS.Client.Core.csproj b/src/NATS.Client.Core/NATS.Client.Core.csproj
index 40a77b231..a1ab2791a 100644
--- a/src/NATS.Client.Core/NATS.Client.Core.csproj
+++ b/src/NATS.Client.Core/NATS.Client.Core.csproj
@@ -1,88 +1,88 @@
-
-
-
- netstandard2.0;netstandard2.1;net6.0;net8.0
- enable
- enable
- true
-
-
- pubsub;messaging
- NATS core client for .NET
- true
-
-
-
- false
- $(NoWarn);CS8774
-
-
-
- $(NoWarn);CS8604
-
-
-
- true
-
-
-
- true
-
-
-
-
-
-
-
- all
- runtime; build; native; contentfiles; analyzers; buildtransitive
-
-
-
-
-
-
-
-
-
-
- all
- runtime; build; native; contentfiles; analyzers
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+ netstandard2.0;netstandard2.1;net6.0;net8.0
+ enable
+ enable
+ true
+
+
+ pubsub;messaging
+ NATS core client for .NET
+ true
+
+
+
+ false
+ $(NoWarn);CS8774
+
+
+
+ $(NoWarn);CS8604
+
+
+
+ true
+
+
+
+ true
+
+
+
+
+
+
+
+ all
+ runtime; build; native; contentfiles; analyzers; buildtransitive
+
+
+
+
+
+
+
+
+
+
+ all
+ runtime; build; native; contentfiles; analyzers
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/NATS.Client.Core/NatsConnection.Publish.cs b/src/NATS.Client.Core/NatsConnection.Publish.cs
index 1f5312413..241e8afbe 100644
--- a/src/NATS.Client.Core/NatsConnection.Publish.cs
+++ b/src/NATS.Client.Core/NatsConnection.Publish.cs
@@ -1,4 +1,3 @@
-using System.Diagnostics;
using NATS.Client.Core.Internal;
namespace NATS.Client.Core;
diff --git a/src/NATS.Client.Core/NatsMsg.cs b/src/NATS.Client.Core/NatsMsg.cs
index c9921441f..d03c2bddd 100644
--- a/src/NATS.Client.Core/NatsMsg.cs
+++ b/src/NATS.Client.Core/NatsMsg.cs
@@ -369,25 +369,6 @@ public static NatsMsg Build(
headers?.SetReadOnly();
- T? data;
- if (headers?.Error == null)
- {
- try
- {
- data = serializer.Deserialize(payloadBuffer);
- }
- catch (Exception e)
- {
- headers ??= new NatsHeaders();
- headers.Error = new NatsDeserializeException(payloadBuffer.ToArray(), e);
- data = default;
- }
- }
- else
- {
- data = default;
- }
-
var size = subject.Length
+ (replyTo?.Length ?? 0)
+ (headersBuffer?.Length ?? 0)
@@ -418,6 +399,25 @@ public static NatsMsg Build(
}
}
+ T? data;
+ if (headers?.Error == null)
+ {
+ try
+ {
+ data = serializer.Deserialize(payloadBuffer);
+ }
+ catch (Exception e)
+ {
+ headers ??= new NatsHeaders();
+ headers.Error = new NatsDeserializeException(payloadBuffer.ToArray(), e);
+ data = default;
+ }
+ }
+ else
+ {
+ data = default;
+ }
+
return new NatsMsg(subject, replyTo, (int)size, headers, data, connection, flags);
}
diff --git a/src/NATS.Client.Core/NatsMsgTelemetryExtensions.cs b/src/NATS.Client.Core/NatsMsgTelemetryExtensions.cs
index 4807f55c3..a5effb37a 100644
--- a/src/NATS.Client.Core/NatsMsgTelemetryExtensions.cs
+++ b/src/NATS.Client.Core/NatsMsgTelemetryExtensions.cs
@@ -26,5 +26,7 @@ public static class NatsMsgTelemetryExtensions
tags: tags);
}
- internal static ActivityContext GetActivityContext(this in NatsMsg msg) => msg.Headers?.Activity?.Context ?? default;
+ /// Gets the activity context associated with the NatsMsg.
+ public static ActivityContext GetActivityContext(this in NatsMsg msg) =>
+ msg.Headers?.Activity?.Context ?? default;
}
diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs
index 9428bb1ef..dbb4eddef 100644
--- a/src/NATS.Client.JetStream/NatsJSContext.cs
+++ b/src/NATS.Client.JetStream/NatsJSContext.cs
@@ -168,6 +168,47 @@ public async ValueTask> TryPublishAsync(
for (var i = 0; i < retryMax; i++)
{
+ if (Connection.Opts.RequestReplyMode == NatsRequestReplyMode.Direct)
+ {
+ var noReply = false;
+ NatsMsg msg;
+ try
+ {
+ msg = await Connection.RequestAsync(
+ subject: subject,
+ data: data,
+ headers: headers,
+ requestSerializer: serializer,
+ replySerializer: NatsJSJsonSerializer.Default,
+ requestOpts: opts,
+ replyOpts: new NatsSubOpts { Timeout = Connection.Opts.RequestTimeout },
+ cancellationToken).ConfigureAwait(false);
+ }
+ catch (NatsNoReplyException)
+ {
+ noReply = true;
+ msg = default;
+ }
+ catch (Exception ex)
+ {
+ return ex;
+ }
+
+ if (noReply || msg.HasNoResponders)
+ {
+ _logger.LogDebug(NatsJSLogEvents.PublishNoResponseRetry, "No response received, retrying {RetryCount}/{RetryMax}", i + 1, retryMax);
+ await Task.Delay(retryWait, cancellationToken);
+ continue;
+ }
+
+ if (msg.Data == null)
+ {
+ return new NatsJSException("No response data received");
+ }
+
+ return msg.Data;
+ }
+
await using var sub = await Connection.CreateRequestSubAsync(
subject: subject,
data: data,
@@ -353,6 +394,52 @@ internal async ValueTask>> TryJSRequestAsyn
// Validator.ValidateObject(request, new ValidationContext(request));
}
+ if (Connection.Opts.RequestReplyMode == NatsRequestReplyMode.Direct)
+ {
+ NatsMsg> msg;
+ try
+ {
+ msg = await Connection.RequestAsync>(
+ subject: subject,
+ data: request,
+ headers: null,
+ replyOpts: new NatsSubOpts { Timeout = Connection.Opts.RequestTimeout },
+ requestSerializer: NatsJSJsonSerializer.Default,
+ replySerializer: NatsJSJsonDocumentSerializer.Default,
+ cancellationToken: cancellationToken).ConfigureAwait(false);
+ }
+ catch (NatsNoReplyException)
+ {
+ return new NatsJSApiNoResponseException();
+ }
+ catch (NatsException e)
+ {
+ return e;
+ }
+
+ if (msg.HasNoResponders)
+ {
+ return new NatsNoRespondersException();
+ }
+
+ if (msg.Error is { } messageError)
+ {
+ return messageError;
+ }
+
+ if (msg.Data.HasException)
+ {
+ return msg.Data.Exception;
+ }
+
+ if (msg.Data.HasError)
+ {
+ return new NatsJSResponse(null, msg.Data.Error);
+ }
+
+ return new NatsJSResponse(msg.Data.Value, null);
+ }
+
await using var sub = await Connection.CreateRequestSubAsync>(
subject: subject,
data: request,
diff --git a/tests/NATS.Client.JetStream.Tests/ConsumeResponseTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumeResponseTest.cs
index 5dbb89f2d..f22eff883 100644
--- a/tests/NATS.Client.JetStream.Tests/ConsumeResponseTest.cs
+++ b/tests/NATS.Client.JetStream.Tests/ConsumeResponseTest.cs
@@ -6,8 +6,10 @@ namespace NATS.Client.JetStream.Tests;
public class ConsumeResponseTest
{
- [Fact]
- public async Task Consume_response()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task Consume_response(NatsRequestReplyMode mode)
{
var headers = new Stack();
headers.Push("NATS/1.0 400 Bad Test Request");
@@ -35,7 +37,7 @@ public async Task Consume_response()
});
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
- await using var nats = new NatsConnection(new NatsOpts { Url = ms.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = ms.Url, RequestReplyMode = mode });
var js = nats.CreateJetStreamContext();
var consumer = await js.GetConsumerAsync("x", "x", cts.Token);
diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs
index ff085bc48..ed1ed755d 100644
--- a/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs
+++ b/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs
@@ -15,11 +15,13 @@ public ConsumerFetchTest(ITestOutputHelper output, NatsServerFixture server)
_server = server;
}
- [Fact]
- public async Task Fetch_test()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task Fetch_test(NatsRequestReplyMode mode)
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
- await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestReplyMode = mode });
var prefix = _server.GetNextId();
var js = new NatsJSContext(nats);
await js.CreateStreamAsync($"{prefix}s1", new[] { $"{prefix}s1.*" }, cts.Token);
@@ -45,11 +47,13 @@ public async Task Fetch_test()
Assert.Equal(10, count);
}
- [Fact]
- public async Task FetchNoWait_test()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task FetchNoWait_test(NatsRequestReplyMode mode)
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
- await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestReplyMode = mode });
var prefix = _server.GetNextId();
var js = new NatsJSContext(nats);
await js.CreateStreamAsync($"{prefix}s1", new[] { $"{prefix}s1.*" }, cts.Token);
@@ -73,10 +77,14 @@ public async Task FetchNoWait_test()
Assert.Equal(10, count);
}
- [Fact]
- public async Task Fetch_dispose_test()
+ [Theory]
+
+ // TODO: Fix this test
+ // [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task Fetch_dispose_test(NatsRequestReplyMode mode)
{
- await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestReplyMode = mode });
var prefix = _server.GetNextId();
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
@@ -104,8 +112,10 @@ public async Task Fetch_dispose_test()
var signal2 = new WaitSignal();
var reader = Task.Run(async () =>
{
+ var x = 0;
await foreach (var msg in fc.Msgs.ReadAllAsync(cts.Token))
{
+ _output.WriteLine($"rcv:{++x}");
await msg.AckAsync(cancellationToken: cts.Token);
signal1.Pulse();
await signal2;
@@ -124,6 +134,7 @@ await Retry.Until(
async () =>
{
var c = await js.GetConsumerAsync($"{prefix}s1", $"{prefix}c1", cts.Token);
+ _output.WriteLine($"pend1:{c.Info.NumAckPending}");
return c.Info.NumAckPending == 9;
},
retryDelay: TimeSpan.FromSeconds(1),
@@ -140,6 +151,7 @@ await Retry.Until(
async () =>
{
var c = await js.GetConsumerAsync($"{prefix}s1", $"{prefix}c1", cts.Token);
+ _output.WriteLine($"pend:{c.Info.NumAckPending}");
return c.Info.NumAckPending == 0;
},
retryDelay: TimeSpan.FromSeconds(1),
diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerSetupTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerSetupTest.cs
index 92ceef15b..3ef4feffd 100644
--- a/tests/NATS.Client.JetStream.Tests/ConsumerSetupTest.cs
+++ b/tests/NATS.Client.JetStream.Tests/ConsumerSetupTest.cs
@@ -19,10 +19,12 @@ public ConsumerSetupTest(ITestOutputHelper output, NatsServerFixture server)
_server = server;
}
- [Fact]
- public async Task Create_push_consumer()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task Create_push_consumer(NatsRequestReplyMode mode)
{
- await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestReplyMode = mode });
var prefix = _server.GetNextId();
var js = new NatsJSContext(nats);
@@ -82,10 +84,12 @@ await js.CreateOrUpdateConsumerAsync(
Assert.Equal(pauseUntil, config.PauseUntil);
}
- [Fact]
- public async Task Consumer_config()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task Consumer_config(NatsRequestReplyMode mode)
{
- await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestReplyMode = mode });
var prefix = _server.GetNextId();
var js = new NatsJSContextFactory().CreateContext(nats);
diff --git a/tests/NATS.Client.JetStream.Tests/DoubleAckNakDelayTests.cs b/tests/NATS.Client.JetStream.Tests/DoubleAckNakDelayTests.cs
index c7d3cd3b9..fa19e9ac9 100644
--- a/tests/NATS.Client.JetStream.Tests/DoubleAckNakDelayTests.cs
+++ b/tests/NATS.Client.JetStream.Tests/DoubleAckNakDelayTests.cs
@@ -17,11 +17,13 @@ public DoubleAckNakDelayTests(ITestOutputHelper output, NatsServerFixture server
_server = server;
}
- [Fact]
- public async Task Double_ack_received_messages()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task Double_ack_received_messages(NatsRequestReplyMode mode)
{
var proxy = _server.CreateProxy();
- await using var nats = proxy.CreateNatsConnection();
+ await using var nats = proxy.CreateNatsConnection(mode);
await nats.ConnectRetryAsync();
var prefix = _server.GetNextId();
@@ -53,11 +55,13 @@ public async Task Double_ack_received_messages()
}
}
- [Fact]
- public async Task Delay_nak_received_messages()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task Delay_nak_received_messages(NatsRequestReplyMode mode)
{
var proxy = _server.CreateProxy();
- await using var nats = proxy.CreateNatsConnection();
+ await using var nats = proxy.CreateNatsConnection(mode);
await nats.ConnectRetryAsync();
var prefix = _server.GetNextId();
diff --git a/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs b/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs
index c4cb878a3..5f6438d00 100644
--- a/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs
+++ b/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs
@@ -49,11 +49,13 @@ public async Task Stream_invalid_name_test(string? streamName)
await Assert.ThrowsAnyAsync(async () => await jsmContext.DeleteMessageAsync(streamName!, new StreamMsgDeleteRequest()));
}
- [Fact]
- public async Task Create_stream_test()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task Create_stream_test(NatsRequestReplyMode mode)
{
await using var server = await NatsServerProcess.StartAsync();
- await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestTimeout = TimeSpan.FromSeconds(10) });
+ await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestTimeout = TimeSpan.FromSeconds(10), RequestReplyMode = mode });
// Happy user
{
diff --git a/tests/NATS.Client.JetStream.Tests/ManageConsumerTest.cs b/tests/NATS.Client.JetStream.Tests/ManageConsumerTest.cs
index a664c2a5d..19713d0a0 100644
--- a/tests/NATS.Client.JetStream.Tests/ManageConsumerTest.cs
+++ b/tests/NATS.Client.JetStream.Tests/ManageConsumerTest.cs
@@ -17,10 +17,12 @@ public ManageConsumerTest(ITestOutputHelper output, NatsServerFixture server)
_server = server;
}
- [Fact]
- public async Task Create_get_consumer()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task Create_get_consumer(NatsRequestReplyMode mode)
{
- await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestTimeout = TimeSpan.FromSeconds(10) });
+ await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestTimeout = TimeSpan.FromSeconds(10), RequestReplyMode = mode });
await nats.ConnectRetryAsync();
var prefix = _server.GetNextId();
var js = new NatsJSContext(nats);
@@ -47,10 +49,12 @@ public async Task Create_get_consumer()
}
}
- [Fact]
- public async Task List_delete_consumer()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task List_delete_consumer(NatsRequestReplyMode mode)
{
- await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestReplyMode = mode });
await nats.ConnectRetryAsync();
var prefix = _server.GetNextId();
var js = new NatsJSContext(nats);
diff --git a/tests/NATS.Client.JetStream.Tests/ManageStreamTest.cs b/tests/NATS.Client.JetStream.Tests/ManageStreamTest.cs
index cc0f8355d..2f938df3a 100644
--- a/tests/NATS.Client.JetStream.Tests/ManageStreamTest.cs
+++ b/tests/NATS.Client.JetStream.Tests/ManageStreamTest.cs
@@ -18,11 +18,13 @@ public ManageStreamTest(ITestOutputHelper output, NatsServerFixture server)
_server = server;
}
- [Fact]
- public async Task Account_info_create_get_update_stream()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task Account_info_create_get_update_stream(NatsRequestReplyMode mode)
{
await using var server = await NatsServerProcess.StartAsync();
- await using var nats = new NatsConnection(new NatsOpts { Url = server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode });
await nats.ConnectRetryAsync();
var js = new NatsJSContext(nats);
@@ -67,10 +69,12 @@ public async Task Account_info_create_get_update_stream()
}
}
- [Fact]
- public async Task List_delete_stream()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task List_delete_stream(NatsRequestReplyMode mode)
{
- await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestReplyMode = mode });
var prefix = _server.GetNextId() + "-";
await nats.ConnectRetryAsync();
@@ -113,10 +117,12 @@ public async Task List_delete_stream()
}
}
- [Fact]
- public async Task Delete_one_msg()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task Delete_one_msg(NatsRequestReplyMode mode)
{
- await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestReplyMode = mode });
var prefix = _server.GetNextId();
await nats.ConnectRetryAsync();
@@ -145,13 +151,15 @@ public async Task Delete_one_msg()
Assert.Equal(2, stream.Info.State.Subjects?.Count);
}
- [Fact]
- public async Task Create_or_update_stream_should_be_create_stream_if_stream_doesnt_exist()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task Create_or_update_stream_should_be_create_stream_if_stream_doesnt_exist(NatsRequestReplyMode mode)
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await using var server = await NatsServerProcess.StartAsync();
- await using var nats = new NatsConnection(new NatsOpts { Url = server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode });
await nats.ConnectRetryAsync();
var js = new NatsJSContext(nats);
@@ -167,10 +175,12 @@ public async Task Create_or_update_stream_should_be_create_stream_if_stream_does
Assert.Equal(1, accountInfoAfter.Streams);
}
- [Fact]
- public async Task Create_or_update_stream_should_be_update_stream_if_stream_exist()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task Create_or_update_stream_should_be_update_stream_if_stream_exist(NatsRequestReplyMode mode)
{
- await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestReplyMode = mode });
var prefix = _server.GetNextId();
await nats.ConnectRetryAsync();
@@ -189,10 +199,12 @@ public async Task Create_or_update_stream_should_be_update_stream_if_stream_exis
Assert.True(updatedStream.Info.Config.NoAck);
}
- [Fact]
- public async Task Create_or_update_stream_should_be_throwing_update_operation_errors()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task Create_or_update_stream_should_be_throwing_update_operation_errors(NatsRequestReplyMode mode)
{
- await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestReplyMode = mode });
var prefix = _server.GetNextId();
await nats.ConnectRetryAsync();
diff --git a/tests/NATS.Client.JetStream.Tests/PublishRetryTest.cs b/tests/NATS.Client.JetStream.Tests/PublishRetryTest.cs
index 5dd376d9f..21ce4d10a 100644
--- a/tests/NATS.Client.JetStream.Tests/PublishRetryTest.cs
+++ b/tests/NATS.Client.JetStream.Tests/PublishRetryTest.cs
@@ -17,11 +17,13 @@ public PublishRetryTest(ITestOutputHelper output, NatsServerFixture server)
_server = server;
}
- [Fact]
- public async Task Publish_without_telemetry()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task Publish_without_telemetry(NatsRequestReplyMode mode)
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
- await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestReplyMode = mode });
var prefix = _server.GetNextId();
// Without telemetry
diff --git a/tests/NATS.Client.JetStream.Tests/PublishTest.cs b/tests/NATS.Client.JetStream.Tests/PublishTest.cs
index 1aa0d0060..0546472b7 100644
--- a/tests/NATS.Client.JetStream.Tests/PublishTest.cs
+++ b/tests/NATS.Client.JetStream.Tests/PublishTest.cs
@@ -21,10 +21,17 @@ public PublishTest(ITestOutputHelper output, NatsServerFixture server)
_server = server;
}
- [Fact]
- public async Task Publish_test()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task Publish_test(NatsRequestReplyMode mode)
{
- await using var nats = _server.CreateNatsConnection();
+ await using var nats = new NatsConnection(new NatsOpts
+ {
+ Url = _server.Url,
+ ConnectTimeout = TimeSpan.FromSeconds(10),
+ RequestReplyMode = mode,
+ });
await nats.ConnectRetryAsync();
var prefix = _server.GetNextId();
@@ -173,8 +180,10 @@ public async Task Publish_test()
}
}
- [Fact]
- public async Task Publish_retry_test()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task Publish_retry_test(NatsRequestReplyMode mode)
{
var retryCount = 0;
var logger = new InMemoryTestLoggerFactory(LogLevel.Debug, log =>
@@ -192,6 +201,7 @@ public async Task Publish_retry_test()
ConnectTimeout = TimeSpan.FromSeconds(10),
RequestTimeout = TimeSpan.FromSeconds(3), // give enough time for retries to avoid NatsJSPublishNoResponseExceptions
LoggerFactory = logger,
+ RequestReplyMode = mode,
});
var prefix = _server.GetNextId();
@@ -275,11 +285,13 @@ await Assert.ThrowsAsync(async () =>
}
}
- [Fact]
- public async Task Publish_no_responders()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task Publish_no_responders(NatsRequestReplyMode mode)
{
await using var server = await NatsServerProcess.StartAsync();
- await using var nats = new NatsConnection(new NatsOpts { Url = server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode });
var js = nats.CreateJetStreamContext();
var result = await js.TryPublishAsync("foo", 1);
Assert.IsType(result.Error);
diff --git a/tests/NATS.Client.JetStream.Tests/Utils.cs b/tests/NATS.Client.JetStream.Tests/Utils.cs
index d72e65a0f..e26bae283 100644
--- a/tests/NATS.Client.JetStream.Tests/Utils.cs
+++ b/tests/NATS.Client.JetStream.Tests/Utils.cs
@@ -15,11 +15,12 @@ public static ValueTask CreateStreamAsync(this NatsJSContext cont
public static NatsProxy CreateProxy(this NatsServerFixture server)
=> new(new Uri(server.Url).Port);
- public static NatsConnection CreateNatsConnection(this NatsProxy proxy)
+ public static NatsConnection CreateNatsConnection(this NatsProxy proxy, NatsRequestReplyMode mode = NatsRequestReplyMode.SharedInbox)
=> new(new NatsOpts
{
Url = $"nats://127.0.0.1:{proxy.Port}",
ConnectTimeout = TimeSpan.FromSeconds(10),
+ RequestReplyMode = mode,
});
public static NatsConnection CreateNatsConnection(this NatsServerFixture server)
diff --git a/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs b/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs
index c78e71305..407b2fb1a 100644
--- a/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs
+++ b/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs
@@ -12,11 +12,13 @@ public class KeyValueStoreTest
public KeyValueStoreTest(ITestOutputHelper output) => _output = output;
- [Fact]
- public async Task Simple_create_put_get_test()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task Simple_create_put_get_test(NatsRequestReplyMode mode)
{
await using var server = await NatsServerProcess.StartAsync();
- await using var nats = new NatsConnection(new NatsOpts { Url = server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode });
var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);
@@ -31,11 +33,13 @@ public async Task Simple_create_put_get_test()
Assert.Equal("v1", entry.Value);
}
- [Fact]
- public async Task Handle_non_direct_gets()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task Handle_non_direct_gets(NatsRequestReplyMode mode)
{
await using var server = await NatsServerProcess.StartAsync();
- await using var nats = new NatsConnection(new NatsOpts { Url = server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode });
var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);
@@ -69,14 +73,16 @@ await js.CreateStreamAsync(new StreamConfig
Assert.Equal("v1", entry.Value);
}
- [Fact]
- public async Task Get_keys()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task Get_keys(NatsRequestReplyMode mode)
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;
await using var server = await NatsServerProcess.StartAsync();
- await using var nats = new NatsConnection(new NatsOpts { Url = server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode });
var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);
@@ -106,14 +112,16 @@ public async Task Get_keys()
Assert.Equal(total, count);
}
- [Fact]
- public async Task Get_key_revisions()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task Get_key_revisions(NatsRequestReplyMode mode)
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;
await using var server = await NatsServerProcess.StartAsync();
- await using var nats = new NatsConnection(new NatsOpts { Url = server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode });
var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);
@@ -164,14 +172,16 @@ public async Task Get_key_revisions()
}
}
- [Fact]
- public async Task Delete_and_purge()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task Delete_and_purge(NatsRequestReplyMode mode)
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;
await using var server = await NatsServerProcess.StartAsync();
- await using var nats = new NatsConnection(new NatsOpts { Url = server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode });
var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);
@@ -290,14 +300,16 @@ await Assert.ThrowsAsync(async () =>
}
}
- [Fact]
- public async Task Purge_deletes()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task Purge_deletes(NatsRequestReplyMode mode)
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;
await using var server = await NatsServerProcess.StartAsync();
- await using var nats = new NatsConnection(new NatsOpts { Url = server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode });
var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);
@@ -372,14 +384,16 @@ public async Task Purge_deletes()
await store.PurgeDeletesAsync(opts: new NatsKVPurgeOpts { DeleteMarkersThreshold = TimeSpan.Zero }, cancellationToken: cancellationToken);
}
- [Fact]
- public async Task Update_with_revisions()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task Update_with_revisions(NatsRequestReplyMode mode)
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;
await using var server = await NatsServerProcess.StartAsync();
- await using var nats = new NatsConnection(new NatsOpts { Url = server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode });
var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);
@@ -410,14 +424,16 @@ await Assert.ThrowsAsync(async () =>
}
}
- [Fact]
- public async Task Create()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task Create(NatsRequestReplyMode mode)
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;
await using var server = await NatsServerProcess.StartAsync();
- await using var nats = new NatsConnection(new NatsOpts { Url = server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode });
var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);
@@ -479,14 +495,16 @@ public async Task TestMessageTTLApiNotSupportedupport()
_output.WriteLine(exception.Message);
}
- [SkipIfNatsServer(versionEarlierThan: "2.11")]
- public async Task TestMessageTTL()
+ [SkipIfNatsServerTheory(versionEarlierThan: "2.11")]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task TestMessageTTL(NatsRequestReplyMode mode)
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;
await using var server = await NatsServerProcess.StartAsync();
- await using var nats = new NatsConnection(new NatsOpts { Url = server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode });
var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);
@@ -539,14 +557,16 @@ await Retry.Until(
Assert.Equal(20ul, state.Info.State.LastSeq);
}
- [SkipIfNatsServer(versionEarlierThan: "2.11")]
- public async Task TestTTLMessageWhenTTLDisabledOnStream()
+ [SkipIfNatsServerTheory(versionEarlierThan: "2.11")]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task TestTTLMessageWhenTTLDisabledOnStream(NatsRequestReplyMode mode)
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;
await using var server = await NatsServerProcess.StartAsync();
- await using var nats = new NatsConnection(new NatsOpts { Url = server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode });
var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);
@@ -556,14 +576,16 @@ public async Task TestTTLMessageWhenTTLDisabledOnStream()
Assert.Equal("This store does not support TTL", exception.Message);
}
- [SkipIfNatsServer(versionEarlierThan: "2.11")]
- public async Task SetsSubjectDeleteMarkerTTL()
+ [SkipIfNatsServerTheory(versionEarlierThan: "2.11")]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task SetsSubjectDeleteMarkerTTL(NatsRequestReplyMode mode)
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;
await using var server = await NatsServerProcess.StartAsync();
- await using var nats = new NatsConnection(new NatsOpts { Url = server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode });
var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);
@@ -573,11 +595,13 @@ public async Task SetsSubjectDeleteMarkerTTL()
Assert.Equal(TimeSpan.FromSeconds(2), info.Info.Config.SubjectDeleteMarkerTTL);
}
- [SkipIfNatsServer(versionEarlierThan: "2.11")]
- public async Task SubjectDeleteMarkerTTL_enabled_removals_should_be_interpreted_as_Operation_Purge()
+ [SkipIfNatsServerTheory(versionEarlierThan: "2.11")]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task SubjectDeleteMarkerTTL_enabled_removals_should_be_interpreted_as_Operation_Purge(NatsRequestReplyMode mode)
{
await using var server = await NatsServerProcess.StartAsync();
- await using var nats = new NatsConnection(new NatsOpts { Url = server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode });
var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);
@@ -622,14 +646,16 @@ public async Task SubjectDeleteMarkerTTL_enabled_removals_should_be_interpreted_
Assert.Equal(3ul, r2);
}
- [SkipIfNatsServer(versionEarlierThan: "2.11")]
- public async Task TestMessageNeverExpire()
+ [SkipIfNatsServerTheory(versionEarlierThan: "2.11")]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task TestMessageNeverExpire(NatsRequestReplyMode mode)
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;
await using var server = await NatsServerProcess.StartAsync();
- await using var nats = new NatsConnection(new NatsOpts { Url = server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode });
var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);
@@ -667,14 +693,16 @@ await Retry.Until(
Assert.Equal(21ul, state.Info.State.LastSeq);
}
- [Fact]
- public async Task History()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task History(NatsRequestReplyMode mode)
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;
await using var server = await NatsServerProcess.StartAsync();
- await using var nats = new NatsConnection(new NatsOpts { Url = server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode });
var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);
@@ -711,14 +739,16 @@ public async Task History()
}
}
- [Fact]
- public async Task Status()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task Status(NatsRequestReplyMode mode)
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;
await using var server = await NatsServerProcess.StartAsync();
- await using var nats = new NatsConnection(new NatsOpts { Url = server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode });
var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);
@@ -750,11 +780,13 @@ public async Task Status()
}
}
- [SkipIfNatsServer(versionEarlierThan: "2.10")]
- public async Task Compressed_storage()
+ [SkipIfNatsServerTheory(versionEarlierThan: "2.10")]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task Compressed_storage(NatsRequestReplyMode mode)
{
await using var server = await NatsServerProcess.StartAsync();
- await using var nats = new NatsConnection(new NatsOpts { Url = server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode });
var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);
@@ -779,11 +811,13 @@ public async Task Compressed_storage()
Assert.Equal(StreamConfigCompression.S2, status2.Info.Config.Compression);
}
- [Fact]
- public async Task Validate_keys()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task Validate_keys(NatsRequestReplyMode mode)
{
await using var server = await NatsServerProcess.StartAsync();
- await using var nats = new NatsConnection(new NatsOpts { Url = server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode });
var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);
@@ -832,8 +866,10 @@ public async Task Validate_keys()
}
}
- [Fact]
- public async Task TestDirectMessageRepublishedSubject()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task TestDirectMessageRepublishedSubject(NatsRequestReplyMode mode)
{
var streamBucketName = "sb-" + Nuid.NewNuid();
var subject = "test";
@@ -846,7 +882,7 @@ public async Task TestDirectMessageRepublishedSubject()
var streamConfig = new StreamConfig(streamBucketName, new[] { streamSubject }) { Republish = new Republish { Src = ">", Dest = republishDest } };
await using var server = await NatsServerProcess.StartAsync();
- await using var nats = new NatsConnection(new NatsOpts { Url = server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode });
var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);
@@ -873,11 +909,13 @@ public async Task TestDirectMessageRepublishedSubject()
Assert.Equal("tres", kve3.Value);
}
- [SkipIfNatsServer(versionEarlierThan: "2.10")]
- public async Task Test_CombinedSources()
+ [SkipIfNatsServerTheory(versionEarlierThan: "2.10")]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task Test_CombinedSources(NatsRequestReplyMode mode)
{
await using var server = await NatsServerProcess.StartAsync();
- await using var nats = new NatsConnection(new NatsOpts { Url = server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode });
var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);
@@ -920,11 +958,13 @@ await Retry.Until(
Assert.Equal("b_fromStore2", entryB.Value);
}
- [Fact]
- public async Task Try_Create()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task Try_Create(NatsRequestReplyMode mode)
{
await using var server = await NatsServerProcess.StartAsync();
- await using var nats = new NatsConnection(new NatsOpts { Url = server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode });
var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);
@@ -954,11 +994,13 @@ public async Task Try_Create()
Assert.ThrowsAny(() => finalValue.Error);
}
- [Fact]
- public async Task Try_Delete()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task Try_Delete(NatsRequestReplyMode mode)
{
await using var server = await NatsServerProcess.StartAsync();
- await using var nats = new NatsConnection(new NatsOpts { Url = server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode });
var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);
@@ -979,11 +1021,13 @@ public async Task Try_Delete()
Assert.ThrowsAny(() => updateResultSuccess.Error);
}
- [Fact]
- public async Task Try_Update()
+ [Theory]
+ [InlineData(NatsRequestReplyMode.Direct)]
+ [InlineData(NatsRequestReplyMode.SharedInbox)]
+ public async Task Try_Update(NatsRequestReplyMode mode)
{
await using var server = await NatsServerProcess.StartAsync();
- await using var nats = new NatsConnection(new NatsOpts { Url = server.Url });
+ await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode });
var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);
diff --git a/tests/NATS.Client.TestUtilities/NATS.Client.TestUtilities.csproj b/tests/NATS.Client.TestUtilities/NATS.Client.TestUtilities.csproj
index 741e25d1a..bf0f9c3cd 100644
--- a/tests/NATS.Client.TestUtilities/NATS.Client.TestUtilities.csproj
+++ b/tests/NATS.Client.TestUtilities/NATS.Client.TestUtilities.csproj
@@ -1,50 +1,50 @@
-
- netstandard2.0;net8.0
- enable
- enable
- false
- $(NoWarn);CS8002
-
+
+ netstandard2.0;net8.0
+ enable
+ enable
+ false
+ $(NoWarn);CS8002
+
-
-
-
-
-
-
-
+
+
+
+
+
+
+
-
-
-
-
-
-
-
-
-
- all
- runtime; build; native; contentfiles; analyzers
-
-
- all
- runtime; build; native; contentfiles; analyzers
-
-
+
+
+
+
+
+
+
+
+
+ all
+ runtime; build; native; contentfiles; analyzers
+
+
+ all
+ runtime; build; native; contentfiles; analyzers
+
+
-
-
-
-
-
-
-
+
+
+
+
+
+
+
-
-
-
+
+
+
diff --git a/tests/NATS.Client.TestUtilities/NatsServerExe.cs b/tests/NATS.Client.TestUtilities/NatsServerExe.cs
index 3645ecca9..4fcd8f29b 100644
--- a/tests/NATS.Client.TestUtilities/NatsServerExe.cs
+++ b/tests/NATS.Client.TestUtilities/NatsServerExe.cs
@@ -93,6 +93,31 @@ public SkipIfNatsServer(bool doesNotSupportTlsFirst = false, string? versionEarl
}
}
+public sealed class SkipIfNatsServerTheory : TheoryAttribute
+{
+ private static readonly bool SupportsTlsFirst;
+
+ static SkipIfNatsServerTheory() => SupportsTlsFirst = NatsServerExe.SupportsTlsFirst();
+
+ public SkipIfNatsServerTheory(bool doesNotSupportTlsFirst = false, string? versionEarlierThan = default, string? versionLaterThan = default)
+ {
+ if (doesNotSupportTlsFirst && !SupportsTlsFirst)
+ {
+ Skip = "NATS server doesn't support TLS first";
+ }
+
+ if (versionEarlierThan != null && new Version(versionEarlierThan) > NatsServerExe.Version)
+ {
+ Skip = $"NATS server version ({NatsServerExe.Version}) is earlier than {versionEarlierThan}";
+ }
+
+ if (versionLaterThan != null && new Version(versionLaterThan) < NatsServerExe.Version)
+ {
+ Skip = $"NATS server version ({NatsServerExe.Version}) is later than {versionLaterThan}";
+ }
+ }
+}
+
public sealed class SkipOnPlatform : FactAttribute
{
public SkipOnPlatform(string platform, string reason)
diff --git a/version.txt b/version.txt
index 6a6a3d8e3..67a287611 100644
--- a/version.txt
+++ b/version.txt
@@ -1 +1 @@
-2.6.1
+2.6.2-preview.1