Skip to content

Write to the outgoing pipeline directly #245

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/workflows/build-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ jobs:
# Note: the cache path is relative to the workspace directory
# https://docs.github.com/en/actions/using-workflows/caching-dependencies-to-speed-up-workflows#using-the-cache-action
path: ~/installers
key: ${{ runner.os }}-v3-${{ hashFiles('.ci/windows/versions.json') }}
key: ${{ runner.os }}-v4-${{ hashFiles('.ci/windows/versions.json') }}
- uses: actions/cache@v4
with:
path: |
~/.nuget/packages
~/AppData/Local/NuGet/v4-cache
key: ${{ runner.os }}-v3-nuget-${{ hashFiles('**/*.csproj','./Directory.Packages.props') }}
key: ${{ runner.os }}-v4-nuget-${{ hashFiles('**/*.csproj','./Directory.Packages.props') }}
restore-keys: |
${{ runner.os }}-v3-nuget-
${{ runner.os }}-v4-nuget-
- name: Install and start RabbitMQ
run: ${{ github.workspace }}\.ci\windows\gha-setup.ps1
- name: Build (Debug)
Expand All @@ -46,9 +46,9 @@ jobs:
path: |
~/.nuget/packages
~/.local/share/NuGet/v4-cache
key: ${{ runner.os }}-v3-nuget-${{ hashFiles('**/*.csproj','./Directory.Packages.props') }}
key: ${{ runner.os }}-v4-nuget-${{ hashFiles('**/*.csproj','./Directory.Packages.props') }}
restore-keys: |
${{ runner.os }}-v3-nuget-
${{ runner.os }}-v4-nuget-
- name: Start RabbitMQ
id: start-rabbitmq
run: ${{ github.workspace }}/.ci/ubuntu/gha-setup.sh
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public OutgoingMsg(byte publisherId, ulong publishingId, Message data)
public Message Data => data;
public int SizeNeeded => 0;

public int Write(Span<byte> span)
public int Write(IBufferWriter<byte> writer)
{
throw new NotImplementedException();
}
Expand Down
14 changes: 8 additions & 6 deletions RabbitMQ.Stream.Client/CloseRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

using System;
using System.Buffers;

namespace RabbitMQ.Stream.Client
{
Expand All @@ -20,13 +20,15 @@ public CloseRequest(uint correlationId, string reason)

public int SizeNeeded => 10 + WireFormatting.StringSize(reason);

public int Write(Span<byte> span)
public int Write(IBufferWriter<byte> writer)
{
var span = writer.GetSpan(SizeNeeded);
var offset = WireFormatting.WriteUInt16(span, Key);
offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version);
offset += WireFormatting.WriteUInt32(span.Slice(offset), correlationId);
offset += WireFormatting.WriteUInt16(span.Slice(offset), 1); //ok code
offset += WireFormatting.WriteString(span.Slice(offset), reason);
offset += WireFormatting.WriteUInt16(span[offset..], ((ICommand)this).Version);
offset += WireFormatting.WriteUInt32(span[offset..], correlationId);
offset += WireFormatting.WriteUInt16(span[offset..], 1); //ok code
offset += WireFormatting.WriteString(span[offset..], reason);
writer.Advance(offset);
return offset;
}
}
Expand Down
3 changes: 2 additions & 1 deletion RabbitMQ.Stream.Client/CloseResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ public CloseResponse(uint correlationId, ResponseCode responseCode)

public ResponseCode ResponseCode => responseCode;

public int Write(Span<byte> span)
public int Write(IBufferWriter<byte> writer)
{
throw new NotImplementedException();
}

internal static int Read(ReadOnlySequence<byte> frame, out CloseResponse command)
{
var offset = WireFormatting.ReadUInt16(frame, out _);
Expand Down
7 changes: 4 additions & 3 deletions RabbitMQ.Stream.Client/CommandVersionsRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

using System;
using System.Buffers;

namespace RabbitMQ.Stream.Client;

Expand All @@ -11,7 +11,6 @@ namespace RabbitMQ.Stream.Client;
private const ushort Key = 0x001b;
private readonly uint _correlationId;
private readonly ICommandVersions[] _commands = { new PublishFilter() };
// private readonly ICommandVersions[] _commands = {};

public CommandVersionsRequest(uint correlationId)
{
Expand All @@ -29,8 +28,9 @@ public int SizeNeeded
}
}

public int Write(Span<byte> span)
public int Write(IBufferWriter<byte> writer)
{
var span = writer.GetSpan(SizeNeeded);
var offset = WireFormatting.WriteUInt16(span, Key);
offset += WireFormatting.WriteUInt16(span[offset..], ((ICommand)this).Version);
offset += WireFormatting.WriteUInt32(span[offset..], _correlationId);
Expand All @@ -43,6 +43,7 @@ public int Write(Span<byte> span)
offset += WireFormatting.WriteUInt16(span[offset..], iCommandVersions.MaxVersion);
}

writer.Advance(offset);
return offset;
}
}
6 changes: 5 additions & 1 deletion RabbitMQ.Stream.Client/CommandVersionsResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ private CommandVersionsResponse(uint correlationId, ResponseCode responseCode, L
}

public int SizeNeeded { get => throw new NotImplementedException(); }
public int Write(Span<byte> span) => throw new NotImplementedException();

public uint CorrelationId { get; }
public ResponseCode ResponseCode { get; }
Expand All @@ -47,4 +46,9 @@ internal static int Read(ReadOnlySequence<byte> frame, out CommandVersionsRespon
command = new CommandVersionsResponse(correlation, (ResponseCode)responseCode, commands);
return offset;
}

public int Write(IBufferWriter<byte> writer)
{
throw new NotImplementedException();
}
}
8 changes: 4 additions & 4 deletions RabbitMQ.Stream.Client/Compression.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ public int Write(Span<byte> span)
var offset = 0;
foreach (var msg in messages)
{
offset += WireFormatting.WriteUInt32(span.Slice(offset), (uint)msg.Size);
offset += msg.Write(span.Slice(offset));
offset += WireFormatting.WriteUInt32(span[offset..], (uint)msg.Size);
offset += msg.Write(span[offset..]);
}

return offset;
Expand Down Expand Up @@ -89,8 +89,8 @@ public void Compress(List<Message> messages)
var offset = 0;
foreach (var msg in messages)
{
offset += WireFormatting.WriteUInt32(span.Slice(offset), (uint)msg.Size);
offset += msg.Write(span.Slice(offset));
offset += WireFormatting.WriteUInt32(span[offset..], (uint)msg.Size);
offset += msg.Write(span[offset..]);
}

using var compressedMemory = new MemoryStream();
Expand Down
93 changes: 76 additions & 17 deletions RabbitMQ.Stream.Client/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ internal static class ConnectionClosedReason

public class Connection : IDisposable
{
private readonly Socket socket;
private readonly Socket socket; // TODO underscore prefix
private readonly PipeWriter writer;
private readonly PipeReader reader;
private readonly Task _incomingFramesTask;
Expand Down Expand Up @@ -103,17 +103,50 @@ public static async Task<Connection> Create(EndPoint endpoint, Func<Memory<byte>
return new Connection(socket, commandCallback, closedCallBack, sslOption, logger);
}

public async ValueTask<bool> Write<T>(T command) where T : struct, ICommand
public ValueTask<bool> Write<T>(T command) where T : struct, ICommand
{
await WriteCommand(command).ConfigureAwait(false);
// we return true to indicate that the command was written
// In this PR https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/220
// we made all WriteCommand async so await is enough to indicate that the command was written
// We decided to keep the return value to avoid a breaking change
return true;
if (!_writeLock.Wait(0))
{
// https://blog.marcgravell.com/2018/07/pipe-dreams-part-3.html
var writeSlowPath = WriteCommandAsyncSlowPath(command);
writeSlowPath.ConfigureAwait(false);
return writeSlowPath;
}
else
{
var release = true;
try
{
var payloadSize = WriteCommandPayloadSize(command);
var written = command.Write(writer);
Debug.Assert(payloadSize == written);
var flush = writer.FlushAsync();
flush.ConfigureAwait(false);
if (flush.IsCompletedSuccessfully)
{
// we return true to indicate that the command was written
// In this PR https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/220
// we made all WriteCommand async so await is enough to indicate that the command was written
// We decided to keep the return value to avoid a breaking change
return ValueTask.FromResult(true);
}
else
{
release = false;
return AwaitFlushThenRelease(flush);
}
}
finally
{
if (release)
{
_writeLock.Release();
}
}
}
}

private async Task WriteCommand<T>(T command) where T : struct, ICommand
private async ValueTask<bool> WriteCommandAsyncSlowPath<T>(T command) where T : struct, ICommand
{
if (Token.IsCancellationRequested)
{
Expand All @@ -129,18 +162,45 @@ private async Task WriteCommand<T>(T command) where T : struct, ICommand
await _writeLock.WaitAsync(Token).ConfigureAwait(false);
try
{
var size = command.SizeNeeded;
var mem = new byte[4 + size]; // + 4 to write the size
WireFormatting.WriteUInt32(mem, (uint)size);
var written = command.Write(mem.AsSpan()[4..]);
await writer.WriteAsync(new ReadOnlyMemory<byte>(mem), Token).ConfigureAwait(false);
Debug.Assert(size == written);
await writer.FlushAsync(Token).ConfigureAwait(false);
var payloadSize = WriteCommandPayloadSize(command);
var written = command.Write(writer);
Debug.Assert(payloadSize == written);
await writer.FlushAsync().ConfigureAwait(false);
}
finally
{
_writeLock.Release();
}

return true;
}

private async ValueTask<bool> AwaitFlushThenRelease(ValueTask<FlushResult> task)
{
try
{
await task.ConfigureAwait(false);
}
finally
{
_writeLock.Release();
}

return true;
}

private int WriteCommandPayloadSize<T>(T command) where T : struct, ICommand
{
/*
* TODO FUTURE
* This code could be moved into a common base class for all outgoing
* commands
*/
var payloadSize = command.SizeNeeded;
var span = writer.GetSpan(WireFormatting.SizeofUInt32); // 4 to write the size
WireFormatting.WriteUInt32(span, (uint)payloadSize);
writer.Advance(WireFormatting.SizeofUInt32);
return payloadSize;
}

private async Task ProcessIncomingFrames()
Expand Down Expand Up @@ -168,7 +228,6 @@ private async Task ProcessIncomingFrames()
while (TryReadFrame(ref buffer, out var frame) && !isClosed)
{
// Let's rent some memory to copy the frame from the network stream. This memory will be reclaimed once the frame has been handled.

var memory =
ArrayPool<byte>.Shared.Rent((int)frame.Length).AsMemory(0, (int)frame.Length);
frame.CopyTo(memory.Span);
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/ConsumerUpdateQueryResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private ConsumerUpdateQueryResponse(uint correlationId, byte subscriptionId, byt
public bool IsActive => active == 1;
public int SizeNeeded => throw new NotImplementedException();

public int Write(Span<byte> span)
public int Write(IBufferWriter<byte> writer)
{
throw new NotImplementedException();
}
Expand Down
14 changes: 8 additions & 6 deletions RabbitMQ.Stream.Client/ConsumerUpdateRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

using System;
using System.Buffers;

namespace RabbitMQ.Stream.Client;

Expand All @@ -27,13 +27,15 @@ public int SizeNeeded
}
}

public int Write(Span<byte> span)
public int Write(IBufferWriter<byte> writer)
{
var span = writer.GetSpan(SizeNeeded);
var offset = WireFormatting.WriteUInt16(span, Key);
offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version);
offset += WireFormatting.WriteUInt32(span.Slice(offset), _correlationId);
offset += WireFormatting.WriteUInt16(span.Slice(offset), (ushort)ResponseCode.Ok);
offset += OffsetSpecification.Write(span.Slice(offset));
offset += WireFormatting.WriteUInt16(span[offset..], ((ICommand)this).Version);
offset += WireFormatting.WriteUInt32(span[offset..], _correlationId);
offset += WireFormatting.WriteUInt16(span[offset..], (ushort)ResponseCode.Ok);
offset += OffsetSpecification.Write(span[offset..]);
writer.Advance(offset);
return offset;
}

Expand Down
20 changes: 12 additions & 8 deletions RabbitMQ.Stream.Client/Create.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,23 @@ public int SizeNeeded
}
}

public int Write(Span<byte> span)
public int Write(IBufferWriter<byte> writer)
{
var span = writer.GetSpan(SizeNeeded);

var offset = WireFormatting.WriteUInt16(span, Key);
offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version);
offset += WireFormatting.WriteUInt32(span.Slice(offset), correlationId);
offset += WireFormatting.WriteString(span.Slice(offset), stream);
offset += WireFormatting.WriteInt32(span.Slice(offset), arguments.Count);
offset += WireFormatting.WriteUInt16(span[offset..], ((ICommand)this).Version);
offset += WireFormatting.WriteUInt32(span[offset..], correlationId);
offset += WireFormatting.WriteString(span[offset..], stream);
offset += WireFormatting.WriteInt32(span[offset..], arguments.Count);

foreach (var (key, value) in arguments)
{
offset += WireFormatting.WriteString(span.Slice(offset), key);
offset += WireFormatting.WriteString(span.Slice(offset), value);
offset += WireFormatting.WriteString(span[offset..], key);
offset += WireFormatting.WriteString(span[offset..], value);
}

writer.Advance(offset);
return offset;
}
}
Expand All @@ -71,10 +74,11 @@ public CreateResponse(uint correlationId, ushort responseCode)

public ResponseCode ResponseCode => (ResponseCode)responseCode;

public int Write(Span<byte> span)
public int Write(IBufferWriter<byte> writer)
{
throw new NotImplementedException();
}

internal static int Read(ReadOnlySequence<byte> frame, out CreateResponse command)
{
var offset = WireFormatting.ReadUInt16(frame, out _);
Expand Down
6 changes: 4 additions & 2 deletions RabbitMQ.Stream.Client/CreateSuperStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ public int SizeNeeded
}
}

public int Write(Span<byte> span)
public int Write(IBufferWriter<byte> writer)
{
var span = writer.GetSpan(SizeNeeded);
var command = (ICommand)this;
var offset = WireFormatting.WriteUInt16(span, Key);
offset += WireFormatting.WriteUInt16(span[offset..], command.Version);
Expand All @@ -72,6 +73,7 @@ public int Write(Span<byte> span)
offset += WireFormatting.WriteString(span[offset..], value);
}

writer.Advance(offset);
return offset;
}
}
Expand All @@ -94,7 +96,7 @@ private CreateSuperStreamResponse(uint correlationId, ushort responseCode)

public ResponseCode ResponseCode => (ResponseCode)_responseCode;

public int Write(Span<byte> span)
public int Write(IBufferWriter<byte> writer)
{
throw new NotImplementedException();
}
Expand Down
Loading