How to create non-blocking consumer and call delegate on new message received #588
Answered
by
mtmk
rdkumavat1
asked this question in
Q&A
Replies: 1 comment 3 replies
-
// dotnet add package NATS.Net
// dotnet add package Microsoft.Extensions.Logging.Console
var opts = new NatsOpts
{
LoggerFactory = LoggerFactory.Create(builder => builder.AddConsole()),
};
await using var nats = new NatsConnection(opts);
var js = new NatsJSContext(nats);
var stream = await js.CreateStreamAsync(new StreamConfig("MY_STREAM", ["events.>"]));
var consumer = await stream.CreateOrUpdateConsumerAsync(new ConsumerConfig("MY_CONSUMER"));
var cts = new CancellationTokenSource();
async Task MessageDelegate(NatsJSMsg<string> msg)
{
Console.WriteLine($"processing: {msg}");
await msg.AckAsync(cancellationToken: cts.Token);
}
var consumeTask = Task.Run(async () =>
{
await foreach (var msg in consumer.ConsumeAsync<string>(cancellationToken: cts.Token))
{
await MessageDelegate(msg);
}
});
// Stop application
await cts.CancelAsync();
await consumeTask; |
Beta Was this translation helpful? Give feedback.
3 replies
Answer selected by
rickdotnet
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
-
How to create a non-blocking consumer subscription and call a method with a delegate on new message publish.
Beta Was this translation helpful? Give feedback.
All reactions