From eaadaabcc481d962d18ab409fca9ad505d81e2b4 Mon Sep 17 00:00:00 2001 From: Vyacheslav Brevnov Date: Tue, 25 Feb 2025 11:20:42 +0300 Subject: [PATCH] feat: + Reduce memory usage --- RabbitMQ.Stream.Client.PerfTest/Program.fs | 2 +- RabbitMQ.Stream.Client/Message.cs | 39 ++++++++++++++++++- RabbitMQ.Stream.Client/PublicAPI.Shipped.txt | 3 ++ .../Reliable/ConfirmationPipe.cs | 21 ++++++---- 4 files changed, 56 insertions(+), 9 deletions(-) diff --git a/RabbitMQ.Stream.Client.PerfTest/Program.fs b/RabbitMQ.Stream.Client.PerfTest/Program.fs index 8f83e61c..ab341f1e 100644 --- a/RabbitMQ.Stream.Client.PerfTest/Program.fs +++ b/RabbitMQ.Stream.Client.PerfTest/Program.fs @@ -40,7 +40,7 @@ let main argv = let! producer = system.CreateRawProducer producerConfig //make producer available to metrics async prod <- producer - let msg = Message "asdf"B + let msg = new Message "asdf"B while run do let! _ = producer.Send(publishingId, msg) publishingId <- publishingId + 1UL diff --git a/RabbitMQ.Stream.Client/Message.cs b/RabbitMQ.Stream.Client/Message.cs index 8c1c116d..70bee978 100644 --- a/RabbitMQ.Stream.Client/Message.cs +++ b/RabbitMQ.Stream.Client/Message.cs @@ -9,8 +9,17 @@ namespace RabbitMQ.Stream.Client { - public class Message + public class Message : IDisposable { + private bool _disposedValue; + private IMemoryOwner _memory; + + public Message(IMemoryOwner memory, int payloadSize) + { + _memory = memory; + Data = new Data(new ReadOnlySequence(memory.Memory.Slice(0, payloadSize))); + } + public Message(byte[] data) : this(new Data(new ReadOnlySequence(data))) { } @@ -157,5 +166,33 @@ public static Message From(ref SequenceReader reader, uint len) }; return msg; } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + +#pragma warning disable IDE0060 // Remove unused parameter + private void Dispose(bool disposing) +#pragma warning restore IDE0060 // Remove unused parameter + { + if (!_disposedValue) + { + try + { + _memory?.Dispose(); + _memory = null; + } + catch + { + // ignore + } + + _disposedValue = true; + } + } + + ~Message() => Dispose(false); } } diff --git a/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt index 53c35247..fa21fc16 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt @@ -382,8 +382,11 @@ RabbitMQ.Stream.Client.Message.Annotations.get -> RabbitMQ.Stream.Client.AMQP.An RabbitMQ.Stream.Client.Message.ApplicationProperties.get -> RabbitMQ.Stream.Client.AMQP.ApplicationProperties RabbitMQ.Stream.Client.Message.ApplicationProperties.set -> void RabbitMQ.Stream.Client.Message.Data.get -> RabbitMQ.Stream.Client.AMQP.Data +RabbitMQ.Stream.Client.Message.Message(System.Buffers.IMemoryOwner memory, int payloadSize) -> void RabbitMQ.Stream.Client.Message.Message(byte[] data) -> void RabbitMQ.Stream.Client.Message.Message(RabbitMQ.Stream.Client.AMQP.Data data) -> void +RabbitMQ.Stream.Client.Message.Dispose() -> void +RabbitMQ.Stream.Client.Message.~Message() -> void RabbitMQ.Stream.Client.Message.MessageHeader.get -> RabbitMQ.Stream.Client.AMQP.Header RabbitMQ.Stream.Client.Message.Properties.get -> RabbitMQ.Stream.Client.AMQP.Properties RabbitMQ.Stream.Client.Message.Properties.set -> void diff --git a/RabbitMQ.Stream.Client/Reliable/ConfirmationPipe.cs b/RabbitMQ.Stream.Client/Reliable/ConfirmationPipe.cs index 62d7d0d9..53c907a7 100644 --- a/RabbitMQ.Stream.Client/Reliable/ConfirmationPipe.cs +++ b/RabbitMQ.Stream.Client/Reliable/ConfirmationPipe.cs @@ -146,14 +146,21 @@ internal void AddUnConfirmedMessage(ulong publishingId, Message message) internal void AddUnConfirmedMessage(ulong publishingId, List messages) { - _waitForConfirmation.TryAdd(publishingId, - new MessagesConfirmation + var messagesConfirmation = new MessagesConfirmation + { + // We need to copy the messages because the user can reuse the same message or deleted them. + Messages = new List(messages), + PublishingId = publishingId, + InsertDateTime = DateTime.Now + }; + + if (!_waitForConfirmation.TryAdd(publishingId, messagesConfirmation)) + { + foreach (var message in messages) { - // We need to copy the messages because the user can reuse the same message or deleted them. - Messages = new List(messages), - PublishingId = publishingId, - InsertDateTime = DateTime.Now - }); + message.Dispose(); + } + } } internal async Task RemoveUnConfirmedMessage(ConfirmationStatus confirmationStatus, ulong publishingId,