Skip to content

change InboundFrame to a class #1613

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ public PublicationAddress? ReplyToAddress

public ReadOnlyBasicProperties(ReadOnlySpan<byte> span)
{
if (span.IsEmpty)
{
return;
}

int offset = 2;
ref readonly byte bits = ref span[0];
if (bits.IsBitSet(BasicProperties.ContentTypeBit)) { offset += WireFormatting.ReadShortstr(span.Slice(offset), out _contentType); }
Expand Down
77 changes: 33 additions & 44 deletions projects/RabbitMQ.Client/client/impl/CommandAssembler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,9 @@ internal sealed class CommandAssembler
private const int MaxArrayOfBytesSize = 2_147_483_591;

private ProtocolCommandId _commandId;
private ReadOnlyMemory<byte> _methodMemory;
private byte[]? _rentedMethodArray;
private ReadOnlyMemory<byte> _headerMemory;
private byte[]? _rentedHeaderArray;
private ReadOnlyMemory<byte> _bodyMemory;
private byte[]? _rentedBodyArray;
private RentedMemory _methodMemory;
private RentedMemory _headerMemory;
private RentedMemory _bodyMemory;
private int _remainingBodyByteCount;
private int _offset;
private AssemblyState _state;
Expand All @@ -66,61 +63,49 @@ public CommandAssembler(uint maxBodyLength)
private void Reset()
{
_commandId = default;
_methodMemory = ReadOnlyMemory<byte>.Empty;
_rentedMethodArray = null;
_headerMemory = ReadOnlyMemory<byte>.Empty;
_rentedHeaderArray = null;
_bodyMemory = ReadOnlyMemory<byte>.Empty;
_rentedBodyArray = null;
_methodMemory = default;
_headerMemory = default;
_bodyMemory = default;
_remainingBodyByteCount = 0;
_offset = 0;
_state = AssemblyState.ExpectingMethod;
}

public bool HandleFrame(in InboundFrame frame, out IncomingCommand command)
public void HandleFrame(InboundFrame frame, out IncomingCommand command)
{
bool shallReturn = true;
switch (_state)
{
case AssemblyState.ExpectingMethod:
ParseMethodFrame(in frame);
shallReturn = false;
ParseMethodFrame(frame);
break;
case AssemblyState.ExpectingContentHeader:
shallReturn = ParseHeaderFrame(in frame);
ParseHeaderFrame(frame);
break;
case AssemblyState.ExpectingContentBody:
shallReturn = ParseBodyFrame(in frame);
ParseBodyFrame(frame);
break;
}

if (_state != AssemblyState.Complete)
{
command = IncomingCommand.Empty;
return shallReturn;
return;
}

RabbitMqClientEventSource.Log.CommandReceived();

var method = new RentedMemory(_methodMemory, _rentedMethodArray);
var header = new RentedMemory(_headerMemory, _rentedHeaderArray);
var body = new RentedMemory(_bodyMemory, _rentedBodyArray);

command = new IncomingCommand(_commandId, method, header, body);
command = new IncomingCommand(_commandId, _methodMemory, _headerMemory, _bodyMemory);
Reset();
return shallReturn;
}

private void ParseMethodFrame(in InboundFrame frame)
private void ParseMethodFrame(InboundFrame frame)
{
if (frame.Type != FrameType.FrameMethod)
{
throw new UnexpectedFrameException(frame.Type);
}

_rentedMethodArray = frame.TakeoverPayload();
_commandId = (ProtocolCommandId)NetworkOrderDeserializer.ReadUInt32(frame.Payload.Span);
_methodMemory = frame.Payload.Slice(4);
_methodMemory = frame.TakeoverPayload(Framing.Method.ArgumentsOffset);

switch (_commandId)
{
Expand All @@ -136,7 +121,7 @@ private void ParseMethodFrame(in InboundFrame frame)
}
}

private bool ParseHeaderFrame(in InboundFrame frame)
private void ParseHeaderFrame(InboundFrame frame)
{
if (frame.Type != FrameType.FrameHeader)
{
Expand All @@ -150,7 +135,7 @@ private bool ParseHeaderFrame(in InboundFrame frame)
throw new UnknownClassOrMethodException(classId, 0);
}

ulong totalBodyBytes = NetworkOrderDeserializer.ReadUInt64(span.Slice(4));
ulong totalBodyBytes = NetworkOrderDeserializer.ReadUInt64(span.Slice(Framing.Header.BodyLengthOffset));
if (totalBodyBytes > MaxArrayOfBytesSize)
{
throw new UnexpectedFrameException(frame.Type);
Expand All @@ -162,16 +147,21 @@ private bool ParseHeaderFrame(in InboundFrame frame)
throw new MalformedFrameException(message: msg, canShutdownCleanly: false);
}

_rentedHeaderArray = totalBodyBytes != 0 ? frame.TakeoverPayload() : Array.Empty<byte>();

_headerMemory = frame.Payload.Slice(12);
// There are always at least 2 bytes, even for empty ones
if (frame.Payload.Length <= Framing.Header.HeaderArgumentOffset + 2)
{
frame.TryReturnPayload();
}
else
{
_headerMemory = frame.TakeoverPayload(Framing.Header.HeaderArgumentOffset);
}

_remainingBodyByteCount = (int)totalBodyBytes;
UpdateContentBodyState();
return _rentedHeaderArray.Length == 0;
}

private bool ParseBodyFrame(in InboundFrame frame)
private void ParseBodyFrame(InboundFrame frame)
{
if (frame.Type != FrameType.FrameBody)
{
Expand All @@ -184,27 +174,26 @@ private bool ParseBodyFrame(in InboundFrame frame)
throw new MalformedFrameException($"Overlong content body received - {_remainingBodyByteCount} bytes remaining, {payloadLength} bytes received");
}

if (_rentedBodyArray is null)
if (_bodyMemory.RentedArray is null)
{
// check for single frame payload for an early exit
if (payloadLength == _remainingBodyByteCount)
{
_rentedBodyArray = frame.TakeoverPayload();
_bodyMemory = frame.Payload;
_bodyMemory = frame.TakeoverPayload(0);
_state = AssemblyState.Complete;
return false;
return;
}

// Is returned by IncomingCommand.ReturnPayload in Session.HandleFrame
_rentedBodyArray = ArrayPool<byte>.Shared.Rent(_remainingBodyByteCount);
_bodyMemory = new ReadOnlyMemory<byte>(_rentedBodyArray, 0, _remainingBodyByteCount);
var rentedBodyArray = ArrayPool<byte>.Shared.Rent(_remainingBodyByteCount);
_bodyMemory = new RentedMemory(new ReadOnlyMemory<byte>(rentedBodyArray, 0, _remainingBodyByteCount), rentedBodyArray);
}

frame.Payload.Span.CopyTo(_rentedBodyArray.AsSpan(_offset));
frame.Payload.Span.CopyTo(_bodyMemory.RentedArray.AsSpan(_offset));
frame.TryReturnPayload();
_remainingBodyByteCount -= payloadLength;
_offset += payloadLength;
UpdateContentBodyState();
return true;
}

private void UpdateContentBodyState()
Expand Down
20 changes: 9 additions & 11 deletions projects/RabbitMQ.Client/client/impl/Connection.Receive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,29 +123,30 @@ await FinishCloseAsync(cts.Token)

private async Task ReceiveLoopAsync(CancellationToken mainLoopCancellationToken)
{
InboundFrame frame = new InboundFrame();

while (false == _closed)
{
mainLoopCancellationToken.ThrowIfCancellationRequested();

while (_frameHandler.TryReadFrame(out InboundFrame frame))
while (_frameHandler.TryReadFrame(frame))
{
NotifyHeartbeatListener();
await ProcessFrameAsync(frame, mainLoopCancellationToken)
.ConfigureAwait(false);
}

// Done reading frames synchronously, go async
InboundFrame asyncFrame = await _frameHandler.ReadFrameAsync(mainLoopCancellationToken)
await _frameHandler.ReadFrameAsync(frame, mainLoopCancellationToken)
.ConfigureAwait(false);
NotifyHeartbeatListener();
await ProcessFrameAsync(asyncFrame, mainLoopCancellationToken)
.ConfigureAwait(false);
await ProcessFrameAsync(frame, mainLoopCancellationToken)
.ConfigureAwait(false);
}
}

private async Task ProcessFrameAsync(InboundFrame frame, CancellationToken cancellationToken)
{
bool shallReturnPayload = true;
if (frame.Channel == 0)
{
if (frame.Type == FrameType.FrameHeartbeat)
Expand All @@ -164,7 +165,7 @@ private async Task ProcessFrameAsync(InboundFrame frame, CancellationToken cance
// quiescing situation, even though technically we
// should be ignoring everything except
// connection.close-ok.
shallReturnPayload = await _session0.HandleFrameAsync(frame, cancellationToken)
await _session0.HandleFrameAsync(frame, cancellationToken)
.ConfigureAwait(false);
}
}
Expand All @@ -182,15 +183,12 @@ private async Task ProcessFrameAsync(InboundFrame frame, CancellationToken cance
// Session itself may be quiescing this particular
// channel, but that's none of our concern.)
ISession session = _sessionManager.Lookup(frame.Channel);
shallReturnPayload = await session.HandleFrameAsync(frame, cancellationToken)
await session.HandleFrameAsync(frame, cancellationToken)
.ConfigureAwait(false);
}
}

if (shallReturnPayload)
{
frame.ReturnPayload();
}
frame.TryReturnPayload();
}

///<remarks>
Expand Down
Loading