diff --git a/RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingRead.cs b/RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingRead.cs index 5f827779..0c819abb 100644 --- a/RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingRead.cs +++ b/RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingRead.cs @@ -4,6 +4,7 @@ using System; using System.Buffers; +using System.Collections.Generic; using System.Runtime.CompilerServices; using System.Text; @@ -18,6 +19,7 @@ private static void PeekType(ref SequenceReader reader, out byte value) { reader.TryPeek(out value); } + internal static int ReadType(ref SequenceReader reader, out byte value) { var read = WireFormatting.ReadByte(ref reader, out value); @@ -154,12 +156,13 @@ internal static int ReadAny(ref SequenceReader reader, out object value) case FormatCode.List32: { offset = ReadListHeader(ref reader, out var fields); + object list = null; for (long i = 0; i < fields; i++) { - offset += ReadAny(ref reader, out _); + offset += ReadAny(ref reader, out list); } - value = null; + value = list; return offset; } @@ -168,13 +171,22 @@ internal static int ReadAny(ref SequenceReader reader, out object value) { offset = ReadMapHeader(ref reader, out var count); var values = count / 2; + Dictionary map = new(); for (uint i = 0; i < values; i++) { - offset += ReadAny(ref reader, out _); - offset += ReadAny(ref reader, out _); + offset += ReadAny(ref reader, out var v); + if (v is string key) + { + offset += ReadAny(ref reader, out var v2); + map[key] = v2; + } + else + { + offset += ReadAny(ref reader, out _); + } } - value = null; + value = map; return offset; } } diff --git a/Tests/Amqp10Tests.cs b/Tests/Amqp10Tests.cs index 633f7f56..9691c5b1 100644 --- a/Tests/Amqp10Tests.cs +++ b/Tests/Amqp10Tests.cs @@ -4,6 +4,7 @@ using System; using System.Buffers; +using System.Collections.Generic; using System.Linq; using System.Text; using RabbitMQ.Stream.Client; @@ -444,6 +445,30 @@ public void ValidateUuidMessagesFromGo() Assert.Equal(uuid_value, uuid_message.Properties.CorrelationId); } + [Fact] + public void ValidateAnnotationMap() + { + // shovel_annotations is a message with a map annotation + // coming from the Go client and the following configuration: + // source queue: "form" + // destination exchange: "to" + // queue bound to the exchange + // shovel from the source queue to the destination exchange + // the annotations will be added to the message + + var buffer = SystemUtils.GetFileContent("shovel_annotations"); + var reader = new SequenceReader(new ReadOnlySequence(buffer)); + var shovelAnnotation = Message.From(ref reader, (uint)reader.Length); + Assert.NotNull(shovelAnnotation); + Assert.NotNull(shovelAnnotation.Annotations["x-shovelled"]); + var xShovelled = shovelAnnotation.Annotations["x-shovelled"] as Dictionary; + Assert.NotNull(xShovelled); + Assert.Equal("hello-key", xShovelled["dest-exchange-key"]); + Assert.Equal("from", xShovelled["src-queue"]); + Assert.Equal("to", xShovelled["dest-exchange"]); + Assert.Equal("dynamic", xShovelled["shovel-type"]); + } + [Fact] public void ValidateNilMessagesFromGo() { diff --git a/Tests/Resources/shovel_annotations b/Tests/Resources/shovel_annotations new file mode 100644 index 00000000..abfeed8c Binary files /dev/null and b/Tests/Resources/shovel_annotations differ