Replies: 4 comments
-
Thanks for using this client and RabbitMQ streams. We intentionally made the public API as small as possible. This makes it possible to change internals without requiring major version increments. If you can provide a use-case (code is ideal!) that would apply to the majority of users of this client we'd be interested to see it. |
Beta Was this translation helpful? Give feedback.
-
Sure. Just keep in mind that it's more an idea, not the final solution and it's totally possible I'm going in wrong direction. Long story short, I'd like to have a Consumer that exposes IAsyncEnumerable rather than callback for handling inbound messages. Both can get job done but from perspective of API consumer IAsyncEnumerable is much easier to work with. It is possible to achieve my goal by wrapping RawConsumer but it doesn't feel right because I'm mostly interested in handling client.Subscribe and RawConsumer hides it. Or let's say I want to do Unsubscribe if stream has not been consumed for some period of time and reSubscribe when required. Hopefully it makes sense. So, I've come up with the following code: // This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2007-2023 VMware, Inc.
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reflection.Metadata;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
namespace RabbitMQ.Stream.Client.Extensions;
public class StreamMessage
{
public IConsumer Consumer { get; init; }
public MessageContext Context { get; init; }
public Message Message { get; init; }
}
public record EnumerableConsumerConfig : IConsumerConfig
{
public EnumerableConsumerConfig(string stream)
{
if (string.IsNullOrWhiteSpace(stream))
{
throw new ArgumentException("Stream cannot be null or whitespace.", nameof(stream));
}
Stream = stream;
}
public string SuperStream { get; set; }
public IOffsetType OffsetSpec { get; set; } = new OffsetTypeNext();
// stream name where the consumer will consume the messages.
// stream must exist before the consumer is created.
public string Stream { get; }
public int BufferSize { get; set; } = 256;
public Action<MetaDataUpdate> MetadataHandler { get; set; } = _ => { };
}
internal struct ChunkConsumer
{
private readonly IConsumer _consumer;
private readonly Chunk _chunk;
private readonly uint _totalMessages;
private ChunkReader _reader;
private uint _currentMessageIndex;
private long _currentSequencePosition;
public ChunkConsumer(in Chunk chunk, IConsumer consumer)
{
_chunk = chunk;
_consumer = consumer;
_currentSequencePosition = 0;
_currentMessageIndex = 0;
if (!_chunk.HasSubEntries)
{
_totalMessages = chunk.NumEntries;
_reader = new ChunkReader(chunk.Data);
}
else
{
_totalMessages = chunk.NumRecords;
_reader = SubEntryReader(chunk.Data, ref _currentSequencePosition);
}
}
private static ChunkReader SubEntryReader(in ReadOnlySequence<byte> seq, ref long position)
{
var reader = new SequenceReader<byte>(seq);
var consumed = SubEntryChunk.Read(ref reader, out var subEntryChunk);
var unCompressedData = CompressionHelper.UnCompress(
subEntryChunk.CompressionType,
subEntryChunk.Data,
subEntryChunk.DataLen,
subEntryChunk.UnCompressedDataSize);
position += consumed;
return new ChunkReader(in unCompressedData);
}
private bool TrySetNextSubEntryReader()
{
Debug.Assert(_chunk.HasSubEntries);
if (_currentSequencePosition >= _chunk.Data.Length)
return false;
var seq = _chunk.Data.Slice(_currentSequencePosition);
_reader = SubEntryReader(in seq, ref _currentSequencePosition);
return true;
}
public bool TryGetMessage(out StreamMessage result)
{
result = null;
while (_currentMessageIndex < _totalMessages)
{
if (_reader.TryGetMessage(out var msg))
{
var offset = _chunk.ChunkId + _currentMessageIndex++;
result = new StreamMessage
{
Consumer = _consumer,
Message = msg,
Context = new MessageContext(offset, TimeSpan.FromMilliseconds(_chunk.Timestamp))
};
return true;
}
if (!_chunk.HasSubEntries || !TrySetNextSubEntryReader())
break;
}
return false;
}
}
internal struct ChunkReader
{
private readonly ReadOnlySequence<byte> _sequence;
private long _currentPosition;
public ChunkReader(in ReadOnlySequence<byte> sequence)
{
_currentPosition = 0;
_sequence = sequence;
}
public bool TryGetMessage(out Message result)
{
result = null;
if (_currentPosition >= _sequence.Length)
return false;
var seq = _sequence.Slice(_currentPosition);
var reader = new SequenceReader<byte>(seq);
WireFormatting.ReadUInt32(ref reader, out var len);
result = Message.From(ref reader, len);
_currentPosition += reader.Consumed;
return true;
}
}
public class EnumerableConsumer : AbstractEntity, IConsumer, IDisposable, IAsyncDisposable
{
private readonly ILogger _logger;
private readonly EnumerableConsumerConfig _config;
private bool _disposed;
private byte _subscriberId;
private IOffsetType _storedOffsetSpec;
private readonly CancellationTokenSource _cancellationTokenSource = new();
private readonly ChannelReader<StreamMessage> _reader;
private readonly ChannelWriter<StreamMessage> _writer;
private EnumerableConsumer(Client client, EnumerableConsumerConfig config, ILogger<EnumerableConsumer> logger = null)
{
_client = client;
_config = config;
_logger = logger ?? NullLogger<EnumerableConsumer>.Instance;
var channel = Channel.CreateBounded<StreamMessage>(
new BoundedChannelOptions(config.BufferSize)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true,
SingleWriter = true,
});
_reader = channel.Reader;
_writer = channel.Writer;
}
public static async Task<EnumerableConsumer> Create(
ClientParameters clientParameters,
EnumerableConsumerConfig config,
StreamInfo metaStreamInfo,
ILogger<EnumerableConsumer> logger = null)
{
var client = await RoutingHelper<Routing>.LookupRandomConnection(clientParameters, metaStreamInfo, logger);
var consumer = new EnumerableConsumer((Client)client, config, logger);
await consumer.Init();
return consumer;
}
private bool MaybeDispatch(ulong offset)
{
return _storedOffsetSpec switch
{
OffsetTypeOffset offsetTypeOffset => offset > offsetTypeOffset.OffsetValue,
_ => true
};
}
private async Task Init()
{
_client.ConnectionClosed += async reason =>
{
if (_config.ConnectionClosedHandler != null)
{
await _config.ConnectionClosedHandler(reason);
}
};
if (_config.MetadataHandler != null)
{
_client.Parameters.MetadataHandler += _config.MetadataHandler;
}
var consumerProperties = new Dictionary<string, string>();
if (!string.IsNullOrEmpty(_config.Reference))
{
consumerProperties["name"] = _config.Reference;
}
if (_config.IsSingleActiveConsumer)
{
consumerProperties["single-active-consumer"] = "true";
if (!string.IsNullOrEmpty(_config.SuperStream))
{
consumerProperties["super-stream"] = _config.SuperStream;
}
}
// this the default value for the consumer.
_storedOffsetSpec = _config.OffsetSpec;
const ushort InitialCredit = 10;
var (consumerId, response) = await _client.Subscribe(
_config.Stream,
_config.OffsetSpec,
InitialCredit,
consumerProperties,
async p =>
{
// receive the chunk from the deliver
// before parse the chunk, we ask for more credits
// in this way we keep the network busy
await _client.Credit(p.SubscriptionId, 1);
var token = _cancellationTokenSource.Token;
if (token.IsCancellationRequested)
return;
_logger.LogTrace("Got new chunk {Id}", p.Chunk.ChunkId);
var chunkConsumer = new ChunkConsumer(p.Chunk, this);
// TODO: Since we're the only channel writer we know how much free space it there, so it makes sense to read messages from chunk in batches.
while (chunkConsumer.TryGetMessage(out var message))
{
try
{
if (MaybeDispatch(message.Context.Offset))
{
await _writer.WriteAsync(message, token);
}
}
catch (OperationCanceledException)
{
}
}
_logger.LogTrace("Chunk {Id} consumed", p.Chunk.ChunkId);
},
async p =>
{
if (_config.ConsumerUpdateListener != null)
{
// in this case the StoredOffsetSpec is overridden by the ConsumerUpdateListener
// since the user decided to override the default behavior
_storedOffsetSpec = await _config.ConsumerUpdateListener(
_config.Reference,
_config.Stream,
p);
}
return _storedOffsetSpec;
});
if (response.ResponseCode == ResponseCode.Ok)
{
_subscriberId = consumerId;
return;
}
throw new CreateConsumerException($"consumer could not be created code: {response.ResponseCode}");
}
public async Task StoreOffset(ulong offset) => await _client.StoreOffset(_config.Reference, _config.Stream, offset);
public async Task<ResponseCode> Close()
{
if (_client.IsClosed)
{
return ResponseCode.Ok;
}
var result = ResponseCode.Ok;
try
{
_cancellationTokenSource.Cancel();
_writer.Complete();
var deleteConsumerResponseTask = _client.Unsubscribe(_subscriberId);
// The default timeout is usually 10 seconds
// in this case we reduce the waiting time
// the consumer could be removed because of stream deleted
// so it is not necessary to wait.
await deleteConsumerResponseTask.WaitAsync(TimeSpan.FromSeconds(3));
if (deleteConsumerResponseTask.IsCompletedSuccessfully)
{
result = deleteConsumerResponseTask.Result.ResponseCode;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error removing the consumer id: {SubscriberId} from the server", _subscriberId);
}
var closed = _client.MaybeClose($"_client-close-subscriber: {_subscriberId}");
ClientExceptions.MaybeThrowException(closed.ResponseCode, $"_client-close-subscriber: {_subscriberId}");
_disposed = true;
_logger.LogDebug("Consumer {SubscriberId} closed", _subscriberId);
return result;
}
public void Dispose() => Dispose(true);
public async ValueTask DisposeAsync()
{
await DisposeAsyncCore();
Dispose(false);
}
protected virtual void Dispose(bool disposing)
{
if (_disposed || !disposing)
return;
var closeConsumer = Close();
closeConsumer.Wait(TimeSpan.FromSeconds(1));
ClientExceptions.MaybeThrowException(closeConsumer.Result, $"Error during remove producer. Subscriber: {_subscriberId}");
}
protected virtual async ValueTask DisposeAsyncCore()
{
if (_disposed)
return;
var closeConsumer = await Close();
ClientExceptions.MaybeThrowException(closeConsumer, $"Error during remove producer. Subscriber: {_subscriberId}");
}
public async IAsyncEnumerable<StreamMessage> Consume([EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var token = CancellationTokenSource.CreateLinkedTokenSource(_cancellationTokenSource.Token, cancellationToken).Token;
while (!token.IsCancellationRequested)
{
StreamMessage message;
try
{
message = await _reader.ReadAsync(token);
_logger.LogTrace("Got message with offset {Offset}", message.Context.Offset);
}
catch (OperationCanceledException)
{
yield break;
}
yield return message;
}
}
} |
Beta Was this translation helpful? Give feedback.
-
Thank you. It seems like a huge change!
What would be the benefit? We did several performance tests. The client is fast like the other clients. |
Beta Was this translation helpful? Give feedback.
-
To be fair, it's pretty big chunk of code but internal=>public changes are relatevly small and can be reduced further. Yes, I don't believe IAsyncEnumerable would bring much in terms of performance. It's more about convenience. Let's say my RabbitMQ stream contains incoming messages. I've got dozen of consumers that at some point awake, start scanning stream from specified (each has its own) offset, get batch of N messages they are interested in (possibly awaiting until batch gets full) and do some stuff. For this scenario performance (in terms "how fast I can get messages from the stream") is not necessary key factor. In some cases, I would rather sacrifice some speed for avioding parsing data I don't need. await foreach (var msg in consumer.Consume(cancallationToken))
{
if (msg.Message is condition A)
{
receivedMessages.Add(msg.Message);
if (receivedMessages.Count >= 5)
break;
}
}
// Some code...
// Alright, keep consuming again.
await foreach (var msg in consumer.Consume(cancallationToken))
{
// Now we want another condition.
if (msg.Message is condition B)
{
receivedMessages.Add(msg.Message);
if (receivedMessages.Count >= 50)
break;
}
} Even without ReactiveExtensons (which seems to be dead) looks "nicer" and cleaner. On the other hand I'm little bit worried about blocking call inside RawConsumer client.Subscribe handler. I understand why it's there but if I've got large number of consumers with relatevly slow consuming rate I'm asking for a trouble. Again, I'm not sure I'm getting all existing implementation details right, so feel free to correct me if I'm wrong. EDIT: Just want to emphasize. I'm not talking about including extra consumers in core library just want core library to enable me to build one myself. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Library does great job handling low level interactions. But at the moment there is no easy way to build custom producers/consumers atop of existing codebase as there are some crucial APIs (like SubEntryChunk and WireFormatting) with internal modifier. Would it be possible to expose more internal APIs for "external" consumers?
I can do a PR for a review if you are interested.
Beta Was this translation helpful? Give feedback.
All reactions