Skip to content

Reduce memory usage #402

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client.PerfTest/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ let main argv =
let! producer = system.CreateRawProducer producerConfig
//make producer available to metrics async
prod <- producer
let msg = Message "asdf"B
let msg = new Message "asdf"B
while run do
let! _ = producer.Send(publishingId, msg)
publishingId <- publishingId + 1UL
Expand Down
39 changes: 38 additions & 1 deletion RabbitMQ.Stream.Client/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,17 @@

namespace RabbitMQ.Stream.Client
{
public class Message
public class Message : IDisposable
{
private bool _disposedValue;
private IMemoryOwner<byte> _memory;

public Message(IMemoryOwner<byte> memory, int payloadSize)
{
_memory = memory;
Data = new Data(new ReadOnlySequence<byte>(memory.Memory.Slice(0, payloadSize)));
}

public Message(byte[] data) : this(new Data(new ReadOnlySequence<byte>(data)))
{
}
Expand Down Expand Up @@ -157,5 +166,33 @@ public static Message From(ref SequenceReader<byte> reader, uint len)
};
return msg;
}

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

#pragma warning disable IDE0060 // Remove unused parameter
private void Dispose(bool disposing)
#pragma warning restore IDE0060 // Remove unused parameter
{
if (!_disposedValue)
{
try
{
_memory?.Dispose();
_memory = null;
}
catch
{
// ignore
}

_disposedValue = true;
}
}

~Message() => Dispose(false);
}
}
3 changes: 3 additions & 0 deletions RabbitMQ.Stream.Client/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,11 @@ RabbitMQ.Stream.Client.Message.Annotations.get -> RabbitMQ.Stream.Client.AMQP.An
RabbitMQ.Stream.Client.Message.ApplicationProperties.get -> RabbitMQ.Stream.Client.AMQP.ApplicationProperties
RabbitMQ.Stream.Client.Message.ApplicationProperties.set -> void
RabbitMQ.Stream.Client.Message.Data.get -> RabbitMQ.Stream.Client.AMQP.Data
RabbitMQ.Stream.Client.Message.Message(System.Buffers.IMemoryOwner<byte> memory, int payloadSize) -> void
RabbitMQ.Stream.Client.Message.Message(byte[] data) -> void
RabbitMQ.Stream.Client.Message.Message(RabbitMQ.Stream.Client.AMQP.Data data) -> void
RabbitMQ.Stream.Client.Message.Dispose() -> void
RabbitMQ.Stream.Client.Message.~Message() -> void
RabbitMQ.Stream.Client.Message.MessageHeader.get -> RabbitMQ.Stream.Client.AMQP.Header
RabbitMQ.Stream.Client.Message.Properties.get -> RabbitMQ.Stream.Client.AMQP.Properties
RabbitMQ.Stream.Client.Message.Properties.set -> void
Expand Down
21 changes: 14 additions & 7 deletions RabbitMQ.Stream.Client/Reliable/ConfirmationPipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,21 @@ internal void AddUnConfirmedMessage(ulong publishingId, Message message)

internal void AddUnConfirmedMessage(ulong publishingId, List<Message> messages)
{
_waitForConfirmation.TryAdd(publishingId,
new MessagesConfirmation
var messagesConfirmation = new MessagesConfirmation
{
// We need to copy the messages because the user can reuse the same message or deleted them.
Messages = new List<Message>(messages),
PublishingId = publishingId,
InsertDateTime = DateTime.Now
};

if (!_waitForConfirmation.TryAdd(publishingId, messagesConfirmation))
{
foreach (var message in messages)
{
// We need to copy the messages because the user can reuse the same message or deleted them.
Messages = new List<Message>(messages),
PublishingId = publishingId,
InsertDateTime = DateTime.Now
});
message.Dispose();
}
}
}

internal async Task RemoveUnConfirmedMessage(ConfirmationStatus confirmationStatus, ulong publishingId,
Expand Down