Skip to content

Commit 97b7d10

Browse files
authored
upgrade jetstream library to nats.net (#283)
* upgrade jetstream library to nats.net
1 parent 55ddc88 commit 97b7d10

File tree

7 files changed

+101
-467
lines changed

7 files changed

+101
-467
lines changed

NBB.Messaging.slnf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
"src\\Messaging\\NBB.Messaging.Effects\\NBB.Messaging.Effects.csproj",
77
"src\\Messaging\\NBB.Messaging.Host\\NBB.Messaging.Host.csproj",
88
"src\\Messaging\\NBB.Messaging.InProcessMessaging\\NBB.Messaging.InProcessMessaging.csproj",
9+
"src\\Messaging\\NBB.Messaging.JetStream\\NBB.Messaging.JetStream.csproj",
910
"src\\Messaging\\NBB.Messaging.MultiTenancy\\NBB.Messaging.MultiTenancy.csproj",
1011
"src\\Messaging\\NBB.Messaging.Nats\\NBB.Messaging.Nats.csproj",
1112
"src\\Messaging\\NBB.Messaging.Noop\\NBB.Messaging.Noop.csproj",

src/Messaging/NBB.Messaging.JetStream/DependencyInjectionExtensions.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@ public static IServiceCollection AddJetStreamTransport(this IServiceCollection s
1515
{
1616
services.Configure<JetStreamOptions>(configuration.GetSection("Messaging").GetSection("JetStream"));
1717
services.AddSingleton<JetStreamConnectionProvider>();
18-
services.AddSingleton<IMessagingTransport, JetStreamMessagingTransport>();
19-
services.AddSingleton<ITransportMonitor>(sp => sp.GetRequiredService<JetStreamConnectionProvider>());
18+
services.AddSingleton<JetStreamMessagingTransport>();
19+
services.AddSingleton<IMessagingTransport>(sp => sp.GetRequiredService<JetStreamMessagingTransport>());
20+
services.AddSingleton<ITransportMonitor>(sp => sp.GetRequiredService<JetStreamMessagingTransport>());
2021

2122
return services;
2223
}

src/Messaging/NBB.Messaging.JetStream/Internal/JetStreamConnectionProvider.cs

Lines changed: 20 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -3,45 +3,35 @@
33

44
using Microsoft.Extensions.Logging;
55
using Microsoft.Extensions.Options;
6-
using NATS.Client;
7-
using NBB.Messaging.Abstractions;
6+
using NATS.Client.Core;
87
using System;
9-
using System.Threading;
108
using System.Threading.Tasks;
119

1210
namespace NBB.Messaging.JetStream.Internal
1311
{
14-
public class JetStreamConnectionProvider : IDisposable, ITransportMonitor
12+
public class JetStreamConnectionProvider : IAsyncDisposable
1513
{
1614
private readonly IOptions<JetStreamOptions> _natsOptions;
1715
private readonly ILogger<JetStreamConnectionProvider> _logger;
18-
private IConnection _connection;
16+
private readonly ILoggerFactory _loggerFactory;
17+
private INatsConnection _connection;
1918
private static readonly object InstanceLoker = new();
20-
private Exception _unrecoverableException;
2119

22-
public event TransportErrorHandler OnError;
23-
24-
public JetStreamConnectionProvider(IOptions<JetStreamOptions> natsOptions, ILogger<JetStreamConnectionProvider> logger)
20+
public JetStreamConnectionProvider(IOptions<JetStreamOptions> natsOptions,
21+
ILogger<JetStreamConnectionProvider> logger, ILoggerFactory loggerFactory)
2522
{
2623
_natsOptions = natsOptions;
2724
_logger = logger;
25+
_loggerFactory = loggerFactory;
2826
}
2927

30-
public async Task ExecuteAsync(Func<IConnection, Task> action)
31-
{
32-
var connection = GetAndCheckConnection();
33-
34-
await action(connection);
35-
}
36-
37-
public void Execute(Action<IConnection> action)
28+
public INatsConnection GetConnection()
3829
{
3930
var connection = GetAndCheckConnection();
40-
41-
action(connection);
31+
return connection;
4232
}
4333

44-
private IConnection GetAndCheckConnection()
34+
private INatsConnection GetAndCheckConnection()
4535
{
4636
if (_connection == null)
4737
lock (InstanceLoker)
@@ -52,53 +42,27 @@ private IConnection GetAndCheckConnection()
5242
return _connection;
5343
}
5444

55-
private IConnection CreateConnection()
45+
private INatsConnection CreateConnection()
5646
{
57-
var options = ConnectionFactory.GetDefaultOptions();
58-
options.Url = _natsOptions.Value.NatsUrl;
59-
60-
//https://github.com/nats-io/nats.net/issues/804
61-
options.AllowReconnect = false;
47+
var options = NatsOpts.Default with { Url = _natsOptions.Value.NatsUrl, LoggerFactory = _loggerFactory };
6248

63-
options.ClosedEventHandler += (_, args) =>
64-
{
65-
SetConnectionLostState(args.Error ?? new Exception("NATS Jetstream connection was lost"));
66-
};
67-
68-
_connection = new ConnectionFactory().CreateConnection(options);
49+
_connection = new NatsConnection(options);
50+
//_connection.ConnectionDisconnected += Connection_ConnectionDisconnected;
51+
//_connection.ReconnectFailed += Connection_ReconnectFailed;
6952
_logger.LogInformation($"NATS Jetstream connection to {_natsOptions.Value.NatsUrl} was established");
7053

7154
return _connection;
7255
}
7356

74-
private void SetConnectionLostState(Exception exception)
75-
{
76-
_connection = null;
77-
78-
// Set the field to the current exception if not already set
79-
var existingException = Interlocked.CompareExchange(ref _unrecoverableException, exception, null);
80-
81-
// Send the application stop signal only once
82-
if (existingException != null)
83-
return;
84-
85-
_logger.LogError(exception, "NATS Jetstream connection unrecoverable");
86-
87-
OnError?.Invoke(exception);
88-
}
89-
90-
public void Dispose()
57+
private ValueTask Connection_ConnectionDisconnected(object sender, NatsEventArgs args)
9158
{
92-
Dispose(true);
93-
GC.SuppressFinalize(this);
59+
_logger.LogWarning($"NATS Jetstream connection was disconnected, {args.Message}");
60+
return ValueTask.CompletedTask;
9461
}
9562

96-
protected virtual void Dispose(bool disposing)
63+
public ValueTask DisposeAsync()
9764
{
98-
if (disposing)
99-
{
100-
_connection?.Dispose();
101-
}
65+
return _connection?.DisposeAsync() ?? ValueTask.CompletedTask;
10266
}
10367
}
10468
}

src/Messaging/NBB.Messaging.JetStream/JetStreamMessagingTransport.cs

Lines changed: 76 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -2,96 +2,103 @@
22
// This source code is licensed under the MIT license.
33

44
using Microsoft.Extensions.Options;
5-
using NATS.Client;
65
using NATS.Client.JetStream;
6+
using NATS.Client.JetStream.Models;
77
using NBB.Messaging.Abstractions;
88
using NBB.Messaging.JetStream.Internal;
99
using System;
1010
using System.Threading;
1111
using System.Threading.Tasks;
1212

13-
namespace NBB.Messaging.JetStream
14-
{
15-
public class JetStreamMessagingTransport : IMessagingTransport
16-
{
17-
private readonly IOptions<JetStreamOptions> _natsOptions;
18-
private readonly JetStreamConnectionProvider _natsConnectionManager;
13+
namespace NBB.Messaging.JetStream;
1914

20-
public JetStreamMessagingTransport(IOptions<JetStreamOptions> natsOptions, JetStreamConnectionProvider natsConnectionManager)
21-
{
22-
_natsOptions = natsOptions;
23-
_natsConnectionManager = natsConnectionManager;
24-
}
15+
public class JetStreamMessagingTransport : IMessagingTransport, ITransportMonitor
16+
{
17+
private readonly IOptions<JetStreamOptions> _natsOptions;
18+
private readonly JetStreamConnectionProvider _natsConnectionManager;
2519

26-
public Task PublishAsync(string topic, TransportSendContext sendContext, CancellationToken cancellationToken = default)
27-
{
28-
var envelopeData = sendContext.EnvelopeBytesAccessor.Invoke();
20+
public JetStreamMessagingTransport(IOptions<JetStreamOptions> natsOptions, JetStreamConnectionProvider natsConnectionManager)
21+
{
22+
_natsOptions = natsOptions;
23+
_natsConnectionManager = natsConnectionManager;
24+
}
2925

30-
return _natsConnectionManager.ExecuteAsync(con =>
31-
{
32-
IJetStream js = con.CreateJetStreamContext();
33-
return js.PublishAsync(topic, envelopeData);
34-
});
35-
}
26+
public event TransportErrorHandler OnError;
3627

37-
public Task<IDisposable> SubscribeAsync(string topic,
38-
Func<TransportReceiveContext, Task> handler,
39-
SubscriptionTransportOptions options = null,
40-
CancellationToken cancellationToken = default)
41-
{
28+
public async Task PublishAsync(string topic, TransportSendContext sendContext, CancellationToken cancellationToken = default)
29+
{
30+
var envelopeData = sendContext.EnvelopeBytesAccessor.Invoke();
31+
await _natsConnectionManager.GetConnection().PublishAsync(topic, envelopeData, cancellationToken: cancellationToken);
32+
}
4233

43-
IDisposable consumer = null;
34+
public async Task<IDisposable> SubscribeAsync(string topic, Func<TransportReceiveContext, Task> handler,
35+
SubscriptionTransportOptions options = null, CancellationToken token = default)
36+
{
37+
var stream = string.Empty;
38+
var js = new NatsJSContext(_natsConnectionManager.GetConnection());
39+
await foreach (var item in js.ListStreamNamesAsync(topic, token)) { stream = item; }
4440

45-
_natsConnectionManager.Execute(con =>
46-
{
47-
IJetStream js = con.CreateJetStreamContext();
41+
var subscriberOptions = options ?? SubscriptionTransportOptions.Default;
4842

49-
// set's up the stream
50-
var isCommand = topic.ToLower().Contains("commands.");
43+
var cc = new ConsumerConfig();
44+
if (subscriberOptions.IsDurable)
45+
{
46+
var clientId = (_natsOptions.Value.ClientId + "__" + topic).Replace(".", "_");
47+
cc.Name = clientId;
48+
cc.DurableName = clientId;
49+
}
5150

52-
var stream = isCommand ? _natsOptions.Value.CommandsStream : _natsOptions.Value.EventsStream;
53-
var jsm = con.CreateJetStreamManagementContext();
54-
jsm.GetStreamInfo(stream);
51+
if (subscriberOptions.DeliverNewMessagesOnly)
52+
cc.DeliverPolicy = ConsumerConfigDeliverPolicy.New;
5553

56-
// get stream context, create consumer and get the consumer context
57-
var streamContext = con.GetStreamContext(stream);
54+
cc.AckWait = TimeSpan.FromMilliseconds(subscriberOptions.AckWait ?? _natsOptions.Value.AckWait ?? 50000);
55+
cc.FilterSubject = topic;
56+
//cc.InactiveThreshold = TimeSpan.FromMinutes(5s);
57+
cc.AckPolicy = ConsumerConfigAckPolicy.Explicit;
5858

59-
var subscriberOptions = options ?? SubscriptionTransportOptions.Default;
60-
var ccb = ConsumerConfiguration.Builder();
59+
var consumeOptions = new NatsJSConsumeOpts
60+
{
61+
MaxMsgs = subscriberOptions.MaxConcurrentMessages,
62+
};
63+
var consumer = await js.CreateOrUpdateConsumerAsync(stream, cc, token);
6164

62-
if (subscriberOptions.IsDurable)
65+
var cts = new CancellationTokenSource();
66+
var t = Task.Run(async () =>
67+
{
68+
try
69+
{
70+
//await consumer.RefreshAsync(token);
71+
await foreach (var msg in consumer.ConsumeAsync<byte[]>(opts: consumeOptions, cancellationToken: cts.Token))
6372
{
64-
var clientId = (_natsOptions.Value.ClientId + topic).Replace(".", "_");
65-
ccb.WithDurable(clientId);
73+
var receiveContext = new TransportReceiveContext(new TransportReceivedData.EnvelopeBytes(msg.Data));
74+
await handler(receiveContext);
75+
await msg.AckAsync(cancellationToken: cts.Token);
6676
}
77+
}
78+
//catch (NatsJSProtocolException e)
79+
//catch (NatsJSException e)
80+
catch (OperationCanceledException)
81+
{
82+
}
83+
catch (Exception e)
84+
{
85+
OnError?.Invoke(e);
86+
}
87+
});
6788

68-
if (subscriberOptions.DeliverNewMessagesOnly)
69-
ccb.WithDeliverPolicy(DeliverPolicy.New);
70-
else
71-
ccb.WithDeliverPolicy(DeliverPolicy.All);
72-
73-
ccb.WithAckWait(subscriberOptions.AckWait ?? _natsOptions.Value.AckWait ?? 50000);
74-
75-
//https://docs.nats.io/nats-concepts/jetstream/consumers#maxackpending
76-
ccb.WithMaxAckPending(subscriberOptions.MaxConcurrentMessages);
77-
ccb.WithFilterSubject(topic);
78-
79-
var consumerContext = streamContext.CreateOrUpdateConsumer(ccb.Build());
80-
81-
void NatsMsgHandler(object obj, MsgHandlerEventArgs args)
82-
{
83-
if (cancellationToken.IsCancellationRequested)
84-
return;
85-
86-
var receiveContext = new TransportReceiveContext(new TransportReceivedData.EnvelopeBytes(args.Message.Data));
87-
88-
// Fire and forget
89-
_ = handler(receiveContext).ContinueWith(_ => args.Message.Ack(), cancellationToken);
90-
}
91-
consumer = consumerContext.Consume(NatsMsgHandler);
89+
//should be asyncDisposable
90+
return new SubscriptionDisposable(() =>
91+
{
92+
cts.Cancel();
93+
t.Wait();
94+
});
95+
}
96+
}
9297

93-
});
94-
return Task.FromResult(consumer);
95-
}
98+
internal record SubscriptionDisposable(Action dispose) : IDisposable
99+
{
100+
public void Dispose()
101+
{
102+
dispose();
96103
}
97104
}

src/Messaging/NBB.Messaging.JetStream/JetStreamOptions.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ public class JetStreamOptions
1919
/// The time the server awaits for acknowledgement from the client before redelivering the message (in milliseconds)
2020
/// </summary>
2121
public int? AckWait { get; set; }
22-
public string CommandsStream { get; set; }
23-
public string EventsStream { get; set; }
2422

2523
}
2624
}

0 commit comments

Comments
 (0)