From 5cc35288975b7fe14a150280a7255eba44589f1a Mon Sep 17 00:00:00 2001 From: Arad Alvand Date: Thu, 22 May 2025 12:02:46 -0400 Subject: [PATCH 01/10] Add support for `Filter` and `Enrich` for OpenTelemetry activities --- .../Internal/NatsInstrumentationContext.cs | 10 + .../Internal/NatsInstrumentationOptions.cs | 26 +++ src/NATS.Client.Core/Internal/Telemetry.cs | 43 ++++- .../TracerProviderBuilderExtensions.cs | 21 +++ src/NATS.Client.Core/NATS.Client.Core.csproj | 177 +++++++++--------- .../NatsConnection.Publish.cs | 1 - 6 files changed, 184 insertions(+), 94 deletions(-) create mode 100644 src/NATS.Client.Core/Internal/NatsInstrumentationContext.cs create mode 100644 src/NATS.Client.Core/Internal/NatsInstrumentationOptions.cs create mode 100644 src/NATS.Client.Core/Internal/TracerProviderBuilderExtensions.cs diff --git a/src/NATS.Client.Core/Internal/NatsInstrumentationContext.cs b/src/NATS.Client.Core/Internal/NatsInstrumentationContext.cs new file mode 100644 index 000000000..ca0ee066e --- /dev/null +++ b/src/NATS.Client.Core/Internal/NatsInstrumentationContext.cs @@ -0,0 +1,10 @@ +namespace NATS.Client.Core.Internal; + +public readonly record struct NatsInstrumentationContext( + string Subject, + NatsHeaders? Headers, + string? ReplyTo, + string? QueueGroup, + long? BodySize, + long? Size, + INatsConnection? Connection); diff --git a/src/NATS.Client.Core/Internal/NatsInstrumentationOptions.cs b/src/NATS.Client.Core/Internal/NatsInstrumentationOptions.cs new file mode 100644 index 000000000..af4e1aa0d --- /dev/null +++ b/src/NATS.Client.Core/Internal/NatsInstrumentationOptions.cs @@ -0,0 +1,26 @@ +using System.Diagnostics; + +namespace NATS.Client.Core.Internal; + +/// +/// Options for the OpenTelemetry instrumentation. +/// +public sealed class NatsInstrumentationOptions +{ + public static NatsInstrumentationOptions Default => new(); + + /// + /// Gets or sets a filter function that determines whether or not to collect telemetry on a per request basis. + /// + /// + /// The return value for the filter function is interpreted as follows: + /// - If filter returns `true`, the request is collected. + /// - If filter returns `false` or throws an exception the request is NOT collected. + /// + public Func? Filter { get; set; } + + /// + /// Gets or sets an action to enrich an Activity. + /// + public Action? Enrich { get; set; } +} diff --git a/src/NATS.Client.Core/Internal/Telemetry.cs b/src/NATS.Client.Core/Internal/Telemetry.cs index fe781f5a4..e82321e42 100644 --- a/src/NATS.Client.Core/Internal/Telemetry.cs +++ b/src/NATS.Client.Core/Internal/Telemetry.cs @@ -6,9 +6,8 @@ namespace NATS.Client.Core.Internal; // https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#messaging-attributes internal static class Telemetry { + internal const string NatsActivitySource = "NATS.Net"; internal static readonly ActivitySource NatsActivities = new(name: NatsActivitySource); - - private const string NatsActivitySource = "NATS.Net"; private static readonly object BoxedTrue = true; internal static bool HasListeners() => NatsActivities.HasListeners(); @@ -23,6 +22,18 @@ internal static class Telemetry if (!NatsActivities.HasListeners()) return null; + var instrumentationContext = new NatsInstrumentationContext( + Subject: subject, + Headers: null, + ReplyTo: replyTo, + QueueGroup: null, + BodySize: null, + Size: null, + Connection: connection); + + if (NatsInstrumentationOptions.Default.Filter is { } filter && !filter(instrumentationContext)) + return null; + KeyValuePair[] tags; if (connection is NatsConnection { ServerInfo: not null } conn) { @@ -63,11 +74,16 @@ internal static class Telemetry tags[3] = new KeyValuePair(Constants.ReplyTo, replyTo); } - return NatsActivities.StartActivity( + var activity = NatsActivities.StartActivity( name, kind: ActivityKind.Producer, parentContext: parentContext ?? default, tags: tags); + + if (activity is not null) + NatsInstrumentationOptions.Default.Enrich?.Invoke(activity, instrumentationContext); + + return activity; } internal static void AddTraceContextHeaders(Activity? activity, ref NatsHeaders? headers) @@ -109,6 +125,18 @@ internal static void AddTraceContextHeaders(Activity? activity, ref NatsHeaders? if (!NatsActivities.HasListeners()) return null; + var instrumentationContext = new NatsInstrumentationContext( + Subject: subject, + Headers: headers, + ReplyTo: replyTo, + QueueGroup: queueGroup, + BodySize: bodySize, + Size: size, + Connection: connection); + + if (NatsInstrumentationOptions.Default.Filter is { } filter && !filter(instrumentationContext)) + return null; + KeyValuePair[] tags; if (connection is NatsConnection { ServerInfo: not null } conn) { @@ -165,11 +193,16 @@ internal static void AddTraceContextHeaders(Activity? activity, ref NatsHeaders? if (headers is null || !TryParseTraceContext(headers, out var context)) context = default; - return NatsActivities.StartActivity( + var activity = NatsActivities.StartActivity( name, kind: ActivityKind.Consumer, parentContext: context, tags: tags); + + if (activity is not null) + NatsInstrumentationOptions.Default.Enrich?.Invoke(activity, instrumentationContext); + + return activity; } internal static void SetException(Activity? activity, Exception exception) @@ -252,7 +285,7 @@ private static bool TryParseTraceContext(NatsHeaders headers, out ActivityContex out var traceParent, out var traceState); - return ActivityContext.TryParse(traceParent, traceState, out context); + return ActivityContext.TryParse(traceParent, traceState, isRemote: true, out context); } internal class Constants diff --git a/src/NATS.Client.Core/Internal/TracerProviderBuilderExtensions.cs b/src/NATS.Client.Core/Internal/TracerProviderBuilderExtensions.cs new file mode 100644 index 000000000..8621a5ad2 --- /dev/null +++ b/src/NATS.Client.Core/Internal/TracerProviderBuilderExtensions.cs @@ -0,0 +1,21 @@ +using OpenTelemetry.Trace; + +namespace NATS.Client.Core.Internal; + +public static class TracerProviderBuilderExtensions +{ + public static TracerProviderBuilder AddNatsInstrumentation( + this TracerProviderBuilder builder, + Action? configure = null) + { + if (configure is not null) + configure(NatsInstrumentationOptions.Default); + + // builder.ConfigureServices(services => + // { + // services.Configure(configure ?? (_ => { })); + // }); + builder.AddSource(Telemetry.NatsActivitySource); + return builder; + } +} diff --git a/src/NATS.Client.Core/NATS.Client.Core.csproj b/src/NATS.Client.Core/NATS.Client.Core.csproj index 40a77b231..90817aa20 100644 --- a/src/NATS.Client.Core/NATS.Client.Core.csproj +++ b/src/NATS.Client.Core/NATS.Client.Core.csproj @@ -1,88 +1,89 @@ - - - - netstandard2.0;netstandard2.1;net6.0;net8.0 - enable - enable - true - - - pubsub;messaging - NATS core client for .NET - true - - - - false - $(NoWarn);CS8774 - - - - $(NoWarn);CS8604 - - - - true - - - - true - - - - - - - - all - runtime; build; native; contentfiles; analyzers; buildtransitive - - - - - - - - - - - all - runtime; build; native; contentfiles; analyzers - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + netstandard2.0;netstandard2.1;net6.0;net8.0 + enable + enable + true + + + pubsub;messaging + NATS core client for .NET + true + + + + false + $(NoWarn);CS8774 + + + + $(NoWarn);CS8604 + + + + true + + + + true + + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + + + + + all + runtime; build; native; contentfiles; analyzers + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/NATS.Client.Core/NatsConnection.Publish.cs b/src/NATS.Client.Core/NatsConnection.Publish.cs index 1f5312413..241e8afbe 100644 --- a/src/NATS.Client.Core/NatsConnection.Publish.cs +++ b/src/NATS.Client.Core/NatsConnection.Publish.cs @@ -1,4 +1,3 @@ -using System.Diagnostics; using NATS.Client.Core.Internal; namespace NATS.Client.Core; From 60508e248f7b6db0d329a277b5e656e239abc5d2 Mon Sep 17 00:00:00 2001 From: Arad Alvand Date: Thu, 22 May 2025 12:17:04 -0400 Subject: [PATCH 02/10] Make `internal` methods in `internal Telemetry` `public` --- src/NATS.Client.Core/Internal/Telemetry.cs | 18 +++++++++--------- .../TracerProviderBuilderExtensions.cs | 4 ---- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/src/NATS.Client.Core/Internal/Telemetry.cs b/src/NATS.Client.Core/Internal/Telemetry.cs index e82321e42..b077c62d1 100644 --- a/src/NATS.Client.Core/Internal/Telemetry.cs +++ b/src/NATS.Client.Core/Internal/Telemetry.cs @@ -6,13 +6,13 @@ namespace NATS.Client.Core.Internal; // https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#messaging-attributes internal static class Telemetry { - internal const string NatsActivitySource = "NATS.Net"; - internal static readonly ActivitySource NatsActivities = new(name: NatsActivitySource); + public const string NatsActivitySource = "NATS.Net"; + public static readonly ActivitySource NatsActivities = new(name: NatsActivitySource); private static readonly object BoxedTrue = true; - internal static bool HasListeners() => NatsActivities.HasListeners(); + public static bool HasListeners() => NatsActivities.HasListeners(); - internal static Activity? StartSendActivity( + public static Activity? StartSendActivity( string name, INatsConnection? connection, string subject, @@ -86,7 +86,7 @@ internal static class Telemetry return activity; } - internal static void AddTraceContextHeaders(Activity? activity, ref NatsHeaders? headers) + public static void AddTraceContextHeaders(Activity? activity, ref NatsHeaders? headers) { if (activity is null) return; @@ -103,7 +103,7 @@ internal static void AddTraceContextHeaders(Activity? activity, ref NatsHeaders? return; } - // There are cases where headers reused internally (e.g. JetStream publish retry) + // There are cases where headers reused publicly (e.g. JetStream publish retry) // there may also be cases where application can reuse the same header // in which case we should still be able to overwrite headers with telemetry fields // even though headers would be set to readonly before being passed down in publish methods. @@ -111,7 +111,7 @@ internal static void AddTraceContextHeaders(Activity? activity, ref NatsHeaders? }); } - internal static Activity? StartReceiveActivity( + public static Activity? StartReceiveActivity( INatsConnection? connection, string name, string subscriptionSubject, @@ -205,7 +205,7 @@ internal static void AddTraceContextHeaders(Activity? activity, ref NatsHeaders? return activity; } - internal static void SetException(Activity? activity, Exception exception) + public static void SetException(Activity? activity, Exception exception) { if (activity is null) return; @@ -288,7 +288,7 @@ private static bool TryParseTraceContext(NatsHeaders headers, out ActivityContex return ActivityContext.TryParse(traceParent, traceState, isRemote: true, out context); } - internal class Constants + public class Constants { public const string True = "true"; public const string False = "false"; diff --git a/src/NATS.Client.Core/Internal/TracerProviderBuilderExtensions.cs b/src/NATS.Client.Core/Internal/TracerProviderBuilderExtensions.cs index 8621a5ad2..240704486 100644 --- a/src/NATS.Client.Core/Internal/TracerProviderBuilderExtensions.cs +++ b/src/NATS.Client.Core/Internal/TracerProviderBuilderExtensions.cs @@ -11,10 +11,6 @@ public static TracerProviderBuilder AddNatsInstrumentation( if (configure is not null) configure(NatsInstrumentationOptions.Default); - // builder.ConfigureServices(services => - // { - // services.Configure(configure ?? (_ => { })); - // }); builder.AddSource(Telemetry.NatsActivitySource); return builder; } From 1613fe21ff6f42652c2786c7e4453ff755c9b6bf Mon Sep 17 00:00:00 2001 From: Arad Alvand Date: Thu, 22 May 2025 12:25:52 -0400 Subject: [PATCH 03/10] Fix package versions and whatnot --- src/NATS.Client.Core/NATS.Client.Core.csproj | 2 +- .../NATS.Client.TestUtilities.csproj | 82 +++++++++---------- 2 files changed, 42 insertions(+), 42 deletions(-) diff --git a/src/NATS.Client.Core/NATS.Client.Core.csproj b/src/NATS.Client.Core/NATS.Client.Core.csproj index 90817aa20..5d4d3e80b 100644 --- a/src/NATS.Client.Core/NATS.Client.Core.csproj +++ b/src/NATS.Client.Core/NATS.Client.Core.csproj @@ -37,7 +37,7 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - + diff --git a/tests/NATS.Client.TestUtilities/NATS.Client.TestUtilities.csproj b/tests/NATS.Client.TestUtilities/NATS.Client.TestUtilities.csproj index 741e25d1a..bf0f9c3cd 100644 --- a/tests/NATS.Client.TestUtilities/NATS.Client.TestUtilities.csproj +++ b/tests/NATS.Client.TestUtilities/NATS.Client.TestUtilities.csproj @@ -1,50 +1,50 @@ - - netstandard2.0;net8.0 - enable - enable - false - $(NoWarn);CS8002 - + + netstandard2.0;net8.0 + enable + enable + false + $(NoWarn);CS8002 + - - - - - - - + + + + + + + - - - - - - - - - - all - runtime; build; native; contentfiles; analyzers - - - all - runtime; build; native; contentfiles; analyzers - - + + + + + + + + + + all + runtime; build; native; contentfiles; analyzers + + + all + runtime; build; native; contentfiles; analyzers + + - - - - - - - + + + + + + + - - - + + + From 923213504082040d15f9ac5efb09b3ed597f15f7 Mon Sep 17 00:00:00 2001 From: Arad Alvand Date: Fri, 23 May 2025 09:54:19 -0400 Subject: [PATCH 04/10] Remove `TracerProviderBuilderExtensions` --- src/NATS.Client.Core/Internal/Telemetry.cs | 5 ++++- .../Internal/TracerProviderBuilderExtensions.cs | 17 ----------------- src/NATS.Client.Core/NATS.Client.Core.csproj | 1 - .../NatsConnection.RequestReply.cs | 4 +++- 4 files changed, 7 insertions(+), 20 deletions(-) delete mode 100644 src/NATS.Client.Core/Internal/TracerProviderBuilderExtensions.cs diff --git a/src/NATS.Client.Core/Internal/Telemetry.cs b/src/NATS.Client.Core/Internal/Telemetry.cs index b077c62d1..2ec9aa168 100644 --- a/src/NATS.Client.Core/Internal/Telemetry.cs +++ b/src/NATS.Client.Core/Internal/Telemetry.cs @@ -284,8 +284,11 @@ private static bool TryParseTraceContext(NatsHeaders headers, out ActivityContex }, out var traceParent, out var traceState); - +#if NET6_0 + return ActivityContext.TryParse(traceParent, traceState, out context); +#else return ActivityContext.TryParse(traceParent, traceState, isRemote: true, out context); +#endif } public class Constants diff --git a/src/NATS.Client.Core/Internal/TracerProviderBuilderExtensions.cs b/src/NATS.Client.Core/Internal/TracerProviderBuilderExtensions.cs deleted file mode 100644 index 240704486..000000000 --- a/src/NATS.Client.Core/Internal/TracerProviderBuilderExtensions.cs +++ /dev/null @@ -1,17 +0,0 @@ -using OpenTelemetry.Trace; - -namespace NATS.Client.Core.Internal; - -public static class TracerProviderBuilderExtensions -{ - public static TracerProviderBuilder AddNatsInstrumentation( - this TracerProviderBuilder builder, - Action? configure = null) - { - if (configure is not null) - configure(NatsInstrumentationOptions.Default); - - builder.AddSource(Telemetry.NatsActivitySource); - return builder; - } -} diff --git a/src/NATS.Client.Core/NATS.Client.Core.csproj b/src/NATS.Client.Core/NATS.Client.Core.csproj index 5d4d3e80b..a1ab2791a 100644 --- a/src/NATS.Client.Core/NATS.Client.Core.csproj +++ b/src/NATS.Client.Core/NATS.Client.Core.csproj @@ -37,7 +37,6 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - diff --git a/src/NATS.Client.Core/NatsConnection.RequestReply.cs b/src/NATS.Client.Core/NatsConnection.RequestReply.cs index f0dec997f..d639bef70 100644 --- a/src/NATS.Client.Core/NatsConnection.RequestReply.cs +++ b/src/NATS.Client.Core/NatsConnection.RequestReply.cs @@ -45,7 +45,9 @@ public async ValueTask> RequestAsync( using var rt = _replyTaskFactory.CreateReplyTask(replySerializer, replyOpts.Timeout); requestSerializer ??= Opts.SerializerRegistry.GetSerializer(); await PublishAsync(subject, data, headers, rt.Subject, requestSerializer, requestOpts, cancellationToken).ConfigureAwait(false); - return await rt.GetResultAsync(cancellationToken).ConfigureAwait(false); + var reply = await rt.GetResultAsync(cancellationToken).ConfigureAwait(false); + reply.Headers?.Activity?.Dispose(); + return reply; } await using var sub1 = await CreateRequestSubAsync(subject, data, headers, requestSerializer, replySerializer, requestOpts, replyOpts, cancellationToken) From dfb00f577021a1425e97e9f93fd93e15e76db13f Mon Sep 17 00:00:00 2001 From: Arad Alvand Date: Fri, 23 May 2025 19:40:10 -0400 Subject: [PATCH 05/10] Include `Deserialize` in the receive activity --- src/NATS.Client.Core/NatsMsg.cs | 38 ++++++++++++++++----------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/src/NATS.Client.Core/NatsMsg.cs b/src/NATS.Client.Core/NatsMsg.cs index c9921441f..d03c2bddd 100644 --- a/src/NATS.Client.Core/NatsMsg.cs +++ b/src/NATS.Client.Core/NatsMsg.cs @@ -369,25 +369,6 @@ public static NatsMsg Build( headers?.SetReadOnly(); - T? data; - if (headers?.Error == null) - { - try - { - data = serializer.Deserialize(payloadBuffer); - } - catch (Exception e) - { - headers ??= new NatsHeaders(); - headers.Error = new NatsDeserializeException(payloadBuffer.ToArray(), e); - data = default; - } - } - else - { - data = default; - } - var size = subject.Length + (replyTo?.Length ?? 0) + (headersBuffer?.Length ?? 0) @@ -418,6 +399,25 @@ public static NatsMsg Build( } } + T? data; + if (headers?.Error == null) + { + try + { + data = serializer.Deserialize(payloadBuffer); + } + catch (Exception e) + { + headers ??= new NatsHeaders(); + headers.Error = new NatsDeserializeException(payloadBuffer.ToArray(), e); + data = default; + } + } + else + { + data = default; + } + return new NatsMsg(subject, replyTo, (int)size, headers, data, connection, flags); } From b278ba6dee3b8f1c875e752506bf70505dc466d9 Mon Sep 17 00:00:00 2001 From: Arad Alvand Date: Fri, 23 May 2025 19:42:33 -0400 Subject: [PATCH 06/10] Revert back accidental change --- src/NATS.Client.Core/NatsConnection.RequestReply.cs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/NATS.Client.Core/NatsConnection.RequestReply.cs b/src/NATS.Client.Core/NatsConnection.RequestReply.cs index d639bef70..f0dec997f 100644 --- a/src/NATS.Client.Core/NatsConnection.RequestReply.cs +++ b/src/NATS.Client.Core/NatsConnection.RequestReply.cs @@ -45,9 +45,7 @@ public async ValueTask> RequestAsync( using var rt = _replyTaskFactory.CreateReplyTask(replySerializer, replyOpts.Timeout); requestSerializer ??= Opts.SerializerRegistry.GetSerializer(); await PublishAsync(subject, data, headers, rt.Subject, requestSerializer, requestOpts, cancellationToken).ConfigureAwait(false); - var reply = await rt.GetResultAsync(cancellationToken).ConfigureAwait(false); - reply.Headers?.Activity?.Dispose(); - return reply; + return await rt.GetResultAsync(cancellationToken).ConfigureAwait(false); } await using var sub1 = await CreateRequestSubAsync(subject, data, headers, requestSerializer, replySerializer, requestOpts, replyOpts, cancellationToken) From 4bc0f0272057bf4c8cca1222d01ddcf404307836 Mon Sep 17 00:00:00 2001 From: mtmk Date: Sat, 24 May 2025 01:57:15 +0100 Subject: [PATCH 07/10] Add RequestReplyMode for JetStream (#867) * Add RequestReplyMode for JetStream * Add benchmarks --- sandbox/MicroBenchmark/JSPublishBench.cs | 5 +- sandbox/MicroBenchmark/KVBench.cs | 5 +- src/NATS.Client.JetStream/NatsJSContext.cs | 87 +++++++++ .../ConsumeResponseTest.cs | 8 +- .../ConsumerFetchTest.cs | 30 ++- .../ConsumerSetupTest.cs | 16 +- .../DoubleAckNakDelayTests.cs | 16 +- .../JetStreamTest.cs | 8 +- .../ManageConsumerTest.cs | 16 +- .../ManageStreamTest.cs | 48 +++-- .../PublishRetryTest.cs | 8 +- .../PublishTest.cs | 28 ++- tests/NATS.Client.JetStream.Tests/Utils.cs | 3 +- .../KeyValueStoreTest.cs | 176 +++++++++++------- .../NatsServerExe.cs | 25 +++ 15 files changed, 348 insertions(+), 131 deletions(-) diff --git a/sandbox/MicroBenchmark/JSPublishBench.cs b/sandbox/MicroBenchmark/JSPublishBench.cs index 2bbbc73b9..4c9c5d173 100644 --- a/sandbox/MicroBenchmark/JSPublishBench.cs +++ b/sandbox/MicroBenchmark/JSPublishBench.cs @@ -15,13 +15,16 @@ public class JSPublishBench private NatsConnection _nats; private NatsJSContext _js; + [Params(NatsRequestReplyMode.Direct, NatsRequestReplyMode.SharedInbox)] + public NatsRequestReplyMode Mode { get; set; } + [Params(1, 10, 1_000)] public int Batch { get; set; } [GlobalSetup] public async Task Setup() { - _nats = new NatsConnection(); + _nats = new NatsConnection(new NatsOpts { RequestReplyMode = Mode }); _js = new NatsJSContext(_nats); await _nats.ConnectAsync(); await _js.CreateStreamAsync(new StreamConfig("bench_test1", ["bench_test1"])); diff --git a/sandbox/MicroBenchmark/KVBench.cs b/sandbox/MicroBenchmark/KVBench.cs index 60b981988..60d5a12b3 100644 --- a/sandbox/MicroBenchmark/KVBench.cs +++ b/sandbox/MicroBenchmark/KVBench.cs @@ -16,10 +16,13 @@ public class KvBench private NatsKVContext _kv; private NatsKVStore _store; + [Params(NatsRequestReplyMode.Direct, NatsRequestReplyMode.SharedInbox)] + public NatsRequestReplyMode Mode { get; set; } + [GlobalSetup] public async Task SetupAsync() { - _nats = new NatsConnection(); + _nats = new NatsConnection(new NatsOpts { RequestReplyMode = Mode }); _js = new NatsJSContext(_nats); _kv = new NatsKVContext(_js); _store = (NatsKVStore)(await _kv.CreateStoreAsync("benchmark")); diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs index 9428bb1ef..dbb4eddef 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -168,6 +168,47 @@ public async ValueTask> TryPublishAsync( for (var i = 0; i < retryMax; i++) { + if (Connection.Opts.RequestReplyMode == NatsRequestReplyMode.Direct) + { + var noReply = false; + NatsMsg msg; + try + { + msg = await Connection.RequestAsync( + subject: subject, + data: data, + headers: headers, + requestSerializer: serializer, + replySerializer: NatsJSJsonSerializer.Default, + requestOpts: opts, + replyOpts: new NatsSubOpts { Timeout = Connection.Opts.RequestTimeout }, + cancellationToken).ConfigureAwait(false); + } + catch (NatsNoReplyException) + { + noReply = true; + msg = default; + } + catch (Exception ex) + { + return ex; + } + + if (noReply || msg.HasNoResponders) + { + _logger.LogDebug(NatsJSLogEvents.PublishNoResponseRetry, "No response received, retrying {RetryCount}/{RetryMax}", i + 1, retryMax); + await Task.Delay(retryWait, cancellationToken); + continue; + } + + if (msg.Data == null) + { + return new NatsJSException("No response data received"); + } + + return msg.Data; + } + await using var sub = await Connection.CreateRequestSubAsync( subject: subject, data: data, @@ -353,6 +394,52 @@ internal async ValueTask>> TryJSRequestAsyn // Validator.ValidateObject(request, new ValidationContext(request)); } + if (Connection.Opts.RequestReplyMode == NatsRequestReplyMode.Direct) + { + NatsMsg> msg; + try + { + msg = await Connection.RequestAsync>( + subject: subject, + data: request, + headers: null, + replyOpts: new NatsSubOpts { Timeout = Connection.Opts.RequestTimeout }, + requestSerializer: NatsJSJsonSerializer.Default, + replySerializer: NatsJSJsonDocumentSerializer.Default, + cancellationToken: cancellationToken).ConfigureAwait(false); + } + catch (NatsNoReplyException) + { + return new NatsJSApiNoResponseException(); + } + catch (NatsException e) + { + return e; + } + + if (msg.HasNoResponders) + { + return new NatsNoRespondersException(); + } + + if (msg.Error is { } messageError) + { + return messageError; + } + + if (msg.Data.HasException) + { + return msg.Data.Exception; + } + + if (msg.Data.HasError) + { + return new NatsJSResponse(null, msg.Data.Error); + } + + return new NatsJSResponse(msg.Data.Value, null); + } + await using var sub = await Connection.CreateRequestSubAsync>( subject: subject, data: request, diff --git a/tests/NATS.Client.JetStream.Tests/ConsumeResponseTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumeResponseTest.cs index 5dbb89f2d..f22eff883 100644 --- a/tests/NATS.Client.JetStream.Tests/ConsumeResponseTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ConsumeResponseTest.cs @@ -6,8 +6,10 @@ namespace NATS.Client.JetStream.Tests; public class ConsumeResponseTest { - [Fact] - public async Task Consume_response() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task Consume_response(NatsRequestReplyMode mode) { var headers = new Stack(); headers.Push("NATS/1.0 400 Bad Test Request"); @@ -35,7 +37,7 @@ public async Task Consume_response() }); var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); - await using var nats = new NatsConnection(new NatsOpts { Url = ms.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = ms.Url, RequestReplyMode = mode }); var js = nats.CreateJetStreamContext(); var consumer = await js.GetConsumerAsync("x", "x", cts.Token); diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs index ff085bc48..ed1ed755d 100644 --- a/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs @@ -15,11 +15,13 @@ public ConsumerFetchTest(ITestOutputHelper output, NatsServerFixture server) _server = server; } - [Fact] - public async Task Fetch_test() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task Fetch_test(NatsRequestReplyMode mode) { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestReplyMode = mode }); var prefix = _server.GetNextId(); var js = new NatsJSContext(nats); await js.CreateStreamAsync($"{prefix}s1", new[] { $"{prefix}s1.*" }, cts.Token); @@ -45,11 +47,13 @@ public async Task Fetch_test() Assert.Equal(10, count); } - [Fact] - public async Task FetchNoWait_test() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task FetchNoWait_test(NatsRequestReplyMode mode) { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestReplyMode = mode }); var prefix = _server.GetNextId(); var js = new NatsJSContext(nats); await js.CreateStreamAsync($"{prefix}s1", new[] { $"{prefix}s1.*" }, cts.Token); @@ -73,10 +77,14 @@ public async Task FetchNoWait_test() Assert.Equal(10, count); } - [Fact] - public async Task Fetch_dispose_test() + [Theory] + + // TODO: Fix this test + // [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task Fetch_dispose_test(NatsRequestReplyMode mode) { - await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestReplyMode = mode }); var prefix = _server.GetNextId(); var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); @@ -104,8 +112,10 @@ public async Task Fetch_dispose_test() var signal2 = new WaitSignal(); var reader = Task.Run(async () => { + var x = 0; await foreach (var msg in fc.Msgs.ReadAllAsync(cts.Token)) { + _output.WriteLine($"rcv:{++x}"); await msg.AckAsync(cancellationToken: cts.Token); signal1.Pulse(); await signal2; @@ -124,6 +134,7 @@ await Retry.Until( async () => { var c = await js.GetConsumerAsync($"{prefix}s1", $"{prefix}c1", cts.Token); + _output.WriteLine($"pend1:{c.Info.NumAckPending}"); return c.Info.NumAckPending == 9; }, retryDelay: TimeSpan.FromSeconds(1), @@ -140,6 +151,7 @@ await Retry.Until( async () => { var c = await js.GetConsumerAsync($"{prefix}s1", $"{prefix}c1", cts.Token); + _output.WriteLine($"pend:{c.Info.NumAckPending}"); return c.Info.NumAckPending == 0; }, retryDelay: TimeSpan.FromSeconds(1), diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerSetupTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerSetupTest.cs index 92ceef15b..3ef4feffd 100644 --- a/tests/NATS.Client.JetStream.Tests/ConsumerSetupTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ConsumerSetupTest.cs @@ -19,10 +19,12 @@ public ConsumerSetupTest(ITestOutputHelper output, NatsServerFixture server) _server = server; } - [Fact] - public async Task Create_push_consumer() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task Create_push_consumer(NatsRequestReplyMode mode) { - await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestReplyMode = mode }); var prefix = _server.GetNextId(); var js = new NatsJSContext(nats); @@ -82,10 +84,12 @@ await js.CreateOrUpdateConsumerAsync( Assert.Equal(pauseUntil, config.PauseUntil); } - [Fact] - public async Task Consumer_config() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task Consumer_config(NatsRequestReplyMode mode) { - await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestReplyMode = mode }); var prefix = _server.GetNextId(); var js = new NatsJSContextFactory().CreateContext(nats); diff --git a/tests/NATS.Client.JetStream.Tests/DoubleAckNakDelayTests.cs b/tests/NATS.Client.JetStream.Tests/DoubleAckNakDelayTests.cs index c7d3cd3b9..fa19e9ac9 100644 --- a/tests/NATS.Client.JetStream.Tests/DoubleAckNakDelayTests.cs +++ b/tests/NATS.Client.JetStream.Tests/DoubleAckNakDelayTests.cs @@ -17,11 +17,13 @@ public DoubleAckNakDelayTests(ITestOutputHelper output, NatsServerFixture server _server = server; } - [Fact] - public async Task Double_ack_received_messages() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task Double_ack_received_messages(NatsRequestReplyMode mode) { var proxy = _server.CreateProxy(); - await using var nats = proxy.CreateNatsConnection(); + await using var nats = proxy.CreateNatsConnection(mode); await nats.ConnectRetryAsync(); var prefix = _server.GetNextId(); @@ -53,11 +55,13 @@ public async Task Double_ack_received_messages() } } - [Fact] - public async Task Delay_nak_received_messages() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task Delay_nak_received_messages(NatsRequestReplyMode mode) { var proxy = _server.CreateProxy(); - await using var nats = proxy.CreateNatsConnection(); + await using var nats = proxy.CreateNatsConnection(mode); await nats.ConnectRetryAsync(); var prefix = _server.GetNextId(); diff --git a/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs b/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs index c4cb878a3..5f6438d00 100644 --- a/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs +++ b/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs @@ -49,11 +49,13 @@ public async Task Stream_invalid_name_test(string? streamName) await Assert.ThrowsAnyAsync(async () => await jsmContext.DeleteMessageAsync(streamName!, new StreamMsgDeleteRequest())); } - [Fact] - public async Task Create_stream_test() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task Create_stream_test(NatsRequestReplyMode mode) { await using var server = await NatsServerProcess.StartAsync(); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestTimeout = TimeSpan.FromSeconds(10) }); + await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestTimeout = TimeSpan.FromSeconds(10), RequestReplyMode = mode }); // Happy user { diff --git a/tests/NATS.Client.JetStream.Tests/ManageConsumerTest.cs b/tests/NATS.Client.JetStream.Tests/ManageConsumerTest.cs index a664c2a5d..19713d0a0 100644 --- a/tests/NATS.Client.JetStream.Tests/ManageConsumerTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ManageConsumerTest.cs @@ -17,10 +17,12 @@ public ManageConsumerTest(ITestOutputHelper output, NatsServerFixture server) _server = server; } - [Fact] - public async Task Create_get_consumer() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task Create_get_consumer(NatsRequestReplyMode mode) { - await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestTimeout = TimeSpan.FromSeconds(10) }); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestTimeout = TimeSpan.FromSeconds(10), RequestReplyMode = mode }); await nats.ConnectRetryAsync(); var prefix = _server.GetNextId(); var js = new NatsJSContext(nats); @@ -47,10 +49,12 @@ public async Task Create_get_consumer() } } - [Fact] - public async Task List_delete_consumer() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task List_delete_consumer(NatsRequestReplyMode mode) { - await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestReplyMode = mode }); await nats.ConnectRetryAsync(); var prefix = _server.GetNextId(); var js = new NatsJSContext(nats); diff --git a/tests/NATS.Client.JetStream.Tests/ManageStreamTest.cs b/tests/NATS.Client.JetStream.Tests/ManageStreamTest.cs index cc0f8355d..2f938df3a 100644 --- a/tests/NATS.Client.JetStream.Tests/ManageStreamTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ManageStreamTest.cs @@ -18,11 +18,13 @@ public ManageStreamTest(ITestOutputHelper output, NatsServerFixture server) _server = server; } - [Fact] - public async Task Account_info_create_get_update_stream() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task Account_info_create_get_update_stream(NatsRequestReplyMode mode) { await using var server = await NatsServerProcess.StartAsync(); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode }); await nats.ConnectRetryAsync(); var js = new NatsJSContext(nats); @@ -67,10 +69,12 @@ public async Task Account_info_create_get_update_stream() } } - [Fact] - public async Task List_delete_stream() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task List_delete_stream(NatsRequestReplyMode mode) { - await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestReplyMode = mode }); var prefix = _server.GetNextId() + "-"; await nats.ConnectRetryAsync(); @@ -113,10 +117,12 @@ public async Task List_delete_stream() } } - [Fact] - public async Task Delete_one_msg() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task Delete_one_msg(NatsRequestReplyMode mode) { - await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestReplyMode = mode }); var prefix = _server.GetNextId(); await nats.ConnectRetryAsync(); @@ -145,13 +151,15 @@ public async Task Delete_one_msg() Assert.Equal(2, stream.Info.State.Subjects?.Count); } - [Fact] - public async Task Create_or_update_stream_should_be_create_stream_if_stream_doesnt_exist() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task Create_or_update_stream_should_be_create_stream_if_stream_doesnt_exist(NatsRequestReplyMode mode) { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); await using var server = await NatsServerProcess.StartAsync(); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode }); await nats.ConnectRetryAsync(); var js = new NatsJSContext(nats); @@ -167,10 +175,12 @@ public async Task Create_or_update_stream_should_be_create_stream_if_stream_does Assert.Equal(1, accountInfoAfter.Streams); } - [Fact] - public async Task Create_or_update_stream_should_be_update_stream_if_stream_exist() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task Create_or_update_stream_should_be_update_stream_if_stream_exist(NatsRequestReplyMode mode) { - await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestReplyMode = mode }); var prefix = _server.GetNextId(); await nats.ConnectRetryAsync(); @@ -189,10 +199,12 @@ public async Task Create_or_update_stream_should_be_update_stream_if_stream_exis Assert.True(updatedStream.Info.Config.NoAck); } - [Fact] - public async Task Create_or_update_stream_should_be_throwing_update_operation_errors() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task Create_or_update_stream_should_be_throwing_update_operation_errors(NatsRequestReplyMode mode) { - await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestReplyMode = mode }); var prefix = _server.GetNextId(); await nats.ConnectRetryAsync(); diff --git a/tests/NATS.Client.JetStream.Tests/PublishRetryTest.cs b/tests/NATS.Client.JetStream.Tests/PublishRetryTest.cs index 5dd376d9f..21ce4d10a 100644 --- a/tests/NATS.Client.JetStream.Tests/PublishRetryTest.cs +++ b/tests/NATS.Client.JetStream.Tests/PublishRetryTest.cs @@ -17,11 +17,13 @@ public PublishRetryTest(ITestOutputHelper output, NatsServerFixture server) _server = server; } - [Fact] - public async Task Publish_without_telemetry() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task Publish_without_telemetry(NatsRequestReplyMode mode) { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestReplyMode = mode }); var prefix = _server.GetNextId(); // Without telemetry diff --git a/tests/NATS.Client.JetStream.Tests/PublishTest.cs b/tests/NATS.Client.JetStream.Tests/PublishTest.cs index 1aa0d0060..0546472b7 100644 --- a/tests/NATS.Client.JetStream.Tests/PublishTest.cs +++ b/tests/NATS.Client.JetStream.Tests/PublishTest.cs @@ -21,10 +21,17 @@ public PublishTest(ITestOutputHelper output, NatsServerFixture server) _server = server; } - [Fact] - public async Task Publish_test() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task Publish_test(NatsRequestReplyMode mode) { - await using var nats = _server.CreateNatsConnection(); + await using var nats = new NatsConnection(new NatsOpts + { + Url = _server.Url, + ConnectTimeout = TimeSpan.FromSeconds(10), + RequestReplyMode = mode, + }); await nats.ConnectRetryAsync(); var prefix = _server.GetNextId(); @@ -173,8 +180,10 @@ public async Task Publish_test() } } - [Fact] - public async Task Publish_retry_test() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task Publish_retry_test(NatsRequestReplyMode mode) { var retryCount = 0; var logger = new InMemoryTestLoggerFactory(LogLevel.Debug, log => @@ -192,6 +201,7 @@ public async Task Publish_retry_test() ConnectTimeout = TimeSpan.FromSeconds(10), RequestTimeout = TimeSpan.FromSeconds(3), // give enough time for retries to avoid NatsJSPublishNoResponseExceptions LoggerFactory = logger, + RequestReplyMode = mode, }); var prefix = _server.GetNextId(); @@ -275,11 +285,13 @@ await Assert.ThrowsAsync(async () => } } - [Fact] - public async Task Publish_no_responders() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task Publish_no_responders(NatsRequestReplyMode mode) { await using var server = await NatsServerProcess.StartAsync(); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode }); var js = nats.CreateJetStreamContext(); var result = await js.TryPublishAsync("foo", 1); Assert.IsType(result.Error); diff --git a/tests/NATS.Client.JetStream.Tests/Utils.cs b/tests/NATS.Client.JetStream.Tests/Utils.cs index d72e65a0f..e26bae283 100644 --- a/tests/NATS.Client.JetStream.Tests/Utils.cs +++ b/tests/NATS.Client.JetStream.Tests/Utils.cs @@ -15,11 +15,12 @@ public static ValueTask CreateStreamAsync(this NatsJSContext cont public static NatsProxy CreateProxy(this NatsServerFixture server) => new(new Uri(server.Url).Port); - public static NatsConnection CreateNatsConnection(this NatsProxy proxy) + public static NatsConnection CreateNatsConnection(this NatsProxy proxy, NatsRequestReplyMode mode = NatsRequestReplyMode.SharedInbox) => new(new NatsOpts { Url = $"nats://127.0.0.1:{proxy.Port}", ConnectTimeout = TimeSpan.FromSeconds(10), + RequestReplyMode = mode, }); public static NatsConnection CreateNatsConnection(this NatsServerFixture server) diff --git a/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs b/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs index c78e71305..407b2fb1a 100644 --- a/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs +++ b/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs @@ -12,11 +12,13 @@ public class KeyValueStoreTest public KeyValueStoreTest(ITestOutputHelper output) => _output = output; - [Fact] - public async Task Simple_create_put_get_test() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task Simple_create_put_get_test(NatsRequestReplyMode mode) { await using var server = await NatsServerProcess.StartAsync(); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode }); var js = new NatsJSContext(nats); var kv = new NatsKVContext(js); @@ -31,11 +33,13 @@ public async Task Simple_create_put_get_test() Assert.Equal("v1", entry.Value); } - [Fact] - public async Task Handle_non_direct_gets() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task Handle_non_direct_gets(NatsRequestReplyMode mode) { await using var server = await NatsServerProcess.StartAsync(); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode }); var js = new NatsJSContext(nats); var kv = new NatsKVContext(js); @@ -69,14 +73,16 @@ await js.CreateStreamAsync(new StreamConfig Assert.Equal("v1", entry.Value); } - [Fact] - public async Task Get_keys() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task Get_keys(NatsRequestReplyMode mode) { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var cancellationToken = cts.Token; await using var server = await NatsServerProcess.StartAsync(); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode }); var js = new NatsJSContext(nats); var kv = new NatsKVContext(js); @@ -106,14 +112,16 @@ public async Task Get_keys() Assert.Equal(total, count); } - [Fact] - public async Task Get_key_revisions() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task Get_key_revisions(NatsRequestReplyMode mode) { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var cancellationToken = cts.Token; await using var server = await NatsServerProcess.StartAsync(); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode }); var js = new NatsJSContext(nats); var kv = new NatsKVContext(js); @@ -164,14 +172,16 @@ public async Task Get_key_revisions() } } - [Fact] - public async Task Delete_and_purge() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task Delete_and_purge(NatsRequestReplyMode mode) { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var cancellationToken = cts.Token; await using var server = await NatsServerProcess.StartAsync(); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode }); var js = new NatsJSContext(nats); var kv = new NatsKVContext(js); @@ -290,14 +300,16 @@ await Assert.ThrowsAsync(async () => } } - [Fact] - public async Task Purge_deletes() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task Purge_deletes(NatsRequestReplyMode mode) { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var cancellationToken = cts.Token; await using var server = await NatsServerProcess.StartAsync(); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode }); var js = new NatsJSContext(nats); var kv = new NatsKVContext(js); @@ -372,14 +384,16 @@ public async Task Purge_deletes() await store.PurgeDeletesAsync(opts: new NatsKVPurgeOpts { DeleteMarkersThreshold = TimeSpan.Zero }, cancellationToken: cancellationToken); } - [Fact] - public async Task Update_with_revisions() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task Update_with_revisions(NatsRequestReplyMode mode) { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var cancellationToken = cts.Token; await using var server = await NatsServerProcess.StartAsync(); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode }); var js = new NatsJSContext(nats); var kv = new NatsKVContext(js); @@ -410,14 +424,16 @@ await Assert.ThrowsAsync(async () => } } - [Fact] - public async Task Create() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task Create(NatsRequestReplyMode mode) { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var cancellationToken = cts.Token; await using var server = await NatsServerProcess.StartAsync(); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode }); var js = new NatsJSContext(nats); var kv = new NatsKVContext(js); @@ -479,14 +495,16 @@ public async Task TestMessageTTLApiNotSupportedupport() _output.WriteLine(exception.Message); } - [SkipIfNatsServer(versionEarlierThan: "2.11")] - public async Task TestMessageTTL() + [SkipIfNatsServerTheory(versionEarlierThan: "2.11")] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task TestMessageTTL(NatsRequestReplyMode mode) { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var cancellationToken = cts.Token; await using var server = await NatsServerProcess.StartAsync(); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode }); var js = new NatsJSContext(nats); var kv = new NatsKVContext(js); @@ -539,14 +557,16 @@ await Retry.Until( Assert.Equal(20ul, state.Info.State.LastSeq); } - [SkipIfNatsServer(versionEarlierThan: "2.11")] - public async Task TestTTLMessageWhenTTLDisabledOnStream() + [SkipIfNatsServerTheory(versionEarlierThan: "2.11")] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task TestTTLMessageWhenTTLDisabledOnStream(NatsRequestReplyMode mode) { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var cancellationToken = cts.Token; await using var server = await NatsServerProcess.StartAsync(); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode }); var js = new NatsJSContext(nats); var kv = new NatsKVContext(js); @@ -556,14 +576,16 @@ public async Task TestTTLMessageWhenTTLDisabledOnStream() Assert.Equal("This store does not support TTL", exception.Message); } - [SkipIfNatsServer(versionEarlierThan: "2.11")] - public async Task SetsSubjectDeleteMarkerTTL() + [SkipIfNatsServerTheory(versionEarlierThan: "2.11")] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task SetsSubjectDeleteMarkerTTL(NatsRequestReplyMode mode) { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var cancellationToken = cts.Token; await using var server = await NatsServerProcess.StartAsync(); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode }); var js = new NatsJSContext(nats); var kv = new NatsKVContext(js); @@ -573,11 +595,13 @@ public async Task SetsSubjectDeleteMarkerTTL() Assert.Equal(TimeSpan.FromSeconds(2), info.Info.Config.SubjectDeleteMarkerTTL); } - [SkipIfNatsServer(versionEarlierThan: "2.11")] - public async Task SubjectDeleteMarkerTTL_enabled_removals_should_be_interpreted_as_Operation_Purge() + [SkipIfNatsServerTheory(versionEarlierThan: "2.11")] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task SubjectDeleteMarkerTTL_enabled_removals_should_be_interpreted_as_Operation_Purge(NatsRequestReplyMode mode) { await using var server = await NatsServerProcess.StartAsync(); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode }); var js = new NatsJSContext(nats); var kv = new NatsKVContext(js); @@ -622,14 +646,16 @@ public async Task SubjectDeleteMarkerTTL_enabled_removals_should_be_interpreted_ Assert.Equal(3ul, r2); } - [SkipIfNatsServer(versionEarlierThan: "2.11")] - public async Task TestMessageNeverExpire() + [SkipIfNatsServerTheory(versionEarlierThan: "2.11")] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task TestMessageNeverExpire(NatsRequestReplyMode mode) { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var cancellationToken = cts.Token; await using var server = await NatsServerProcess.StartAsync(); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode }); var js = new NatsJSContext(nats); var kv = new NatsKVContext(js); @@ -667,14 +693,16 @@ await Retry.Until( Assert.Equal(21ul, state.Info.State.LastSeq); } - [Fact] - public async Task History() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task History(NatsRequestReplyMode mode) { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var cancellationToken = cts.Token; await using var server = await NatsServerProcess.StartAsync(); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode }); var js = new NatsJSContext(nats); var kv = new NatsKVContext(js); @@ -711,14 +739,16 @@ public async Task History() } } - [Fact] - public async Task Status() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task Status(NatsRequestReplyMode mode) { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var cancellationToken = cts.Token; await using var server = await NatsServerProcess.StartAsync(); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode }); var js = new NatsJSContext(nats); var kv = new NatsKVContext(js); @@ -750,11 +780,13 @@ public async Task Status() } } - [SkipIfNatsServer(versionEarlierThan: "2.10")] - public async Task Compressed_storage() + [SkipIfNatsServerTheory(versionEarlierThan: "2.10")] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task Compressed_storage(NatsRequestReplyMode mode) { await using var server = await NatsServerProcess.StartAsync(); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode }); var js = new NatsJSContext(nats); var kv = new NatsKVContext(js); @@ -779,11 +811,13 @@ public async Task Compressed_storage() Assert.Equal(StreamConfigCompression.S2, status2.Info.Config.Compression); } - [Fact] - public async Task Validate_keys() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task Validate_keys(NatsRequestReplyMode mode) { await using var server = await NatsServerProcess.StartAsync(); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode }); var js = new NatsJSContext(nats); var kv = new NatsKVContext(js); @@ -832,8 +866,10 @@ public async Task Validate_keys() } } - [Fact] - public async Task TestDirectMessageRepublishedSubject() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task TestDirectMessageRepublishedSubject(NatsRequestReplyMode mode) { var streamBucketName = "sb-" + Nuid.NewNuid(); var subject = "test"; @@ -846,7 +882,7 @@ public async Task TestDirectMessageRepublishedSubject() var streamConfig = new StreamConfig(streamBucketName, new[] { streamSubject }) { Republish = new Republish { Src = ">", Dest = republishDest } }; await using var server = await NatsServerProcess.StartAsync(); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode }); var js = new NatsJSContext(nats); var kv = new NatsKVContext(js); @@ -873,11 +909,13 @@ public async Task TestDirectMessageRepublishedSubject() Assert.Equal("tres", kve3.Value); } - [SkipIfNatsServer(versionEarlierThan: "2.10")] - public async Task Test_CombinedSources() + [SkipIfNatsServerTheory(versionEarlierThan: "2.10")] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task Test_CombinedSources(NatsRequestReplyMode mode) { await using var server = await NatsServerProcess.StartAsync(); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode }); var js = new NatsJSContext(nats); var kv = new NatsKVContext(js); @@ -920,11 +958,13 @@ await Retry.Until( Assert.Equal("b_fromStore2", entryB.Value); } - [Fact] - public async Task Try_Create() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task Try_Create(NatsRequestReplyMode mode) { await using var server = await NatsServerProcess.StartAsync(); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode }); var js = new NatsJSContext(nats); var kv = new NatsKVContext(js); @@ -954,11 +994,13 @@ public async Task Try_Create() Assert.ThrowsAny(() => finalValue.Error); } - [Fact] - public async Task Try_Delete() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task Try_Delete(NatsRequestReplyMode mode) { await using var server = await NatsServerProcess.StartAsync(); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode }); var js = new NatsJSContext(nats); var kv = new NatsKVContext(js); @@ -979,11 +1021,13 @@ public async Task Try_Delete() Assert.ThrowsAny(() => updateResultSuccess.Error); } - [Fact] - public async Task Try_Update() + [Theory] + [InlineData(NatsRequestReplyMode.Direct)] + [InlineData(NatsRequestReplyMode.SharedInbox)] + public async Task Try_Update(NatsRequestReplyMode mode) { await using var server = await NatsServerProcess.StartAsync(); - await using var nats = new NatsConnection(new NatsOpts { Url = server.Url }); + await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestReplyMode = mode }); var js = new NatsJSContext(nats); var kv = new NatsKVContext(js); diff --git a/tests/NATS.Client.TestUtilities/NatsServerExe.cs b/tests/NATS.Client.TestUtilities/NatsServerExe.cs index 3645ecca9..4fcd8f29b 100644 --- a/tests/NATS.Client.TestUtilities/NatsServerExe.cs +++ b/tests/NATS.Client.TestUtilities/NatsServerExe.cs @@ -93,6 +93,31 @@ public SkipIfNatsServer(bool doesNotSupportTlsFirst = false, string? versionEarl } } +public sealed class SkipIfNatsServerTheory : TheoryAttribute +{ + private static readonly bool SupportsTlsFirst; + + static SkipIfNatsServerTheory() => SupportsTlsFirst = NatsServerExe.SupportsTlsFirst(); + + public SkipIfNatsServerTheory(bool doesNotSupportTlsFirst = false, string? versionEarlierThan = default, string? versionLaterThan = default) + { + if (doesNotSupportTlsFirst && !SupportsTlsFirst) + { + Skip = "NATS server doesn't support TLS first"; + } + + if (versionEarlierThan != null && new Version(versionEarlierThan) > NatsServerExe.Version) + { + Skip = $"NATS server version ({NatsServerExe.Version}) is earlier than {versionEarlierThan}"; + } + + if (versionLaterThan != null && new Version(versionLaterThan) < NatsServerExe.Version) + { + Skip = $"NATS server version ({NatsServerExe.Version}) is later than {versionLaterThan}"; + } + } +} + public sealed class SkipOnPlatform : FactAttribute { public SkipOnPlatform(string platform, string reason) From b48efb77639bd736af1a4f492c64cbec210590f8 Mon Sep 17 00:00:00 2001 From: mtmk Date: Sat, 24 May 2025 01:58:20 +0100 Subject: [PATCH 08/10] Release 2.6.2-preview.1 (#868) * Add RequestReplyMode for JetStream (#867) --- version.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.txt b/version.txt index 6a6a3d8e3..67a287611 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -2.6.1 +2.6.2-preview.1 From 7a7449740687753275c20f61d40bc288c43275d8 Mon Sep 17 00:00:00 2001 From: Arad Alvand Date: Sun, 25 May 2025 19:03:42 -0400 Subject: [PATCH 09/10] Add `ParentContext` to `NatsInstrumentationContext` --- .../Internal/NatsInstrumentationContext.cs | 5 ++++- src/NATS.Client.Core/Internal/Telemetry.cs | 12 +++++++----- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/NATS.Client.Core/Internal/NatsInstrumentationContext.cs b/src/NATS.Client.Core/Internal/NatsInstrumentationContext.cs index ca0ee066e..20de553b2 100644 --- a/src/NATS.Client.Core/Internal/NatsInstrumentationContext.cs +++ b/src/NATS.Client.Core/Internal/NatsInstrumentationContext.cs @@ -1,3 +1,5 @@ +using System.Diagnostics; + namespace NATS.Client.Core.Internal; public readonly record struct NatsInstrumentationContext( @@ -7,4 +9,5 @@ public readonly record struct NatsInstrumentationContext( string? QueueGroup, long? BodySize, long? Size, - INatsConnection? Connection); + INatsConnection? Connection, + ActivityContext? ParentContext); diff --git a/src/NATS.Client.Core/Internal/Telemetry.cs b/src/NATS.Client.Core/Internal/Telemetry.cs index 2ec9aa168..c955af7a3 100644 --- a/src/NATS.Client.Core/Internal/Telemetry.cs +++ b/src/NATS.Client.Core/Internal/Telemetry.cs @@ -29,7 +29,8 @@ internal static class Telemetry QueueGroup: null, BodySize: null, Size: null, - Connection: connection); + Connection: connection, + ParentContext: parentContext); if (NatsInstrumentationOptions.Default.Filter is { } filter && !filter(instrumentationContext)) return null; @@ -125,6 +126,9 @@ public static void AddTraceContextHeaders(Activity? activity, ref NatsHeaders? h if (!NatsActivities.HasListeners()) return null; + if (headers is null || !TryParseTraceContext(headers, out var context)) + context = default; + var instrumentationContext = new NatsInstrumentationContext( Subject: subject, Headers: headers, @@ -132,7 +136,8 @@ public static void AddTraceContextHeaders(Activity? activity, ref NatsHeaders? h QueueGroup: queueGroup, BodySize: bodySize, Size: size, - Connection: connection); + Connection: connection, + ParentContext: context); if (NatsInstrumentationOptions.Default.Filter is { } filter && !filter(instrumentationContext)) return null; @@ -190,9 +195,6 @@ public static void AddTraceContextHeaders(Activity? activity, ref NatsHeaders? h tags[9] = new KeyValuePair(Constants.ReplyTo, replyTo); } - if (headers is null || !TryParseTraceContext(headers, out var context)) - context = default; - var activity = NatsActivities.StartActivity( name, kind: ActivityKind.Consumer, From 6f79e2e652ae3318432775add580b06755f666c9 Mon Sep 17 00:00:00 2001 From: Arad Alvand Date: Tue, 27 May 2025 20:46:27 -0400 Subject: [PATCH 10/10] Make `GetActivityContext` public to provide the ability to get context activity context --- src/NATS.Client.Core/NatsMsgTelemetryExtensions.cs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/NATS.Client.Core/NatsMsgTelemetryExtensions.cs b/src/NATS.Client.Core/NatsMsgTelemetryExtensions.cs index 4807f55c3..a5effb37a 100644 --- a/src/NATS.Client.Core/NatsMsgTelemetryExtensions.cs +++ b/src/NATS.Client.Core/NatsMsgTelemetryExtensions.cs @@ -26,5 +26,7 @@ public static class NatsMsgTelemetryExtensions tags: tags); } - internal static ActivityContext GetActivityContext(this in NatsMsg msg) => msg.Headers?.Activity?.Context ?? default; + /// Gets the activity context associated with the NatsMsg. + public static ActivityContext GetActivityContext(this in NatsMsg msg) => + msg.Headers?.Activity?.Context ?? default; }