From 49a0e4d98f8af1a516d487a42f799bcc8579241b Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 19 Mar 2025 11:23:13 +0100 Subject: [PATCH 1/3] add map parse Signed-off-by: Gabriele Santomaggio --- .../AMQP/AmqpWireFormattingRead.cs | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingRead.cs b/RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingRead.cs index 5f827779..361de9ee 100644 --- a/RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingRead.cs +++ b/RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingRead.cs @@ -4,6 +4,7 @@ using System; using System.Buffers; +using System.Collections.Generic; using System.Runtime.CompilerServices; using System.Text; @@ -18,6 +19,7 @@ private static void PeekType(ref SequenceReader reader, out byte value) { reader.TryPeek(out value); } + internal static int ReadType(ref SequenceReader reader, out byte value) { var read = WireFormatting.ReadByte(ref reader, out value); @@ -154,12 +156,14 @@ internal static int ReadAny(ref SequenceReader reader, out object value) case FormatCode.List32: { offset = ReadListHeader(ref reader, out var fields); + object list = null; for (long i = 0; i < fields; i++) { - offset += ReadAny(ref reader, out _); + offset += ReadAny(ref reader, out list); } - value = null; + + value = list; return offset; } @@ -168,13 +172,20 @@ internal static int ReadAny(ref SequenceReader reader, out object value) { offset = ReadMapHeader(ref reader, out var count); var values = count / 2; + Dictionary map = new(); for (uint i = 0; i < values; i++) { - offset += ReadAny(ref reader, out _); - offset += ReadAny(ref reader, out _); + offset += ReadAny(ref reader, out var v); + if (v is string key) + { + offset += ReadAny(ref reader, out var v2); + map[key] = v2; + } + else + offset += ReadAny(ref reader, out _); } - value = null; + value = map; return offset; } } From 0639afe59000a6b850cedc58716e8a93d60f8343 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 19 Mar 2025 11:30:34 +0100 Subject: [PATCH 2/3] add map parse Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingRead.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingRead.cs b/RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingRead.cs index 361de9ee..0c819abb 100644 --- a/RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingRead.cs +++ b/RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingRead.cs @@ -162,7 +162,6 @@ internal static int ReadAny(ref SequenceReader reader, out object value) offset += ReadAny(ref reader, out list); } - value = list; return offset; } @@ -182,7 +181,9 @@ internal static int ReadAny(ref SequenceReader reader, out object value) map[key] = v2; } else + { offset += ReadAny(ref reader, out _); + } } value = map; From f3f20023f4b47f6bcf15f209269d2d1c3b3b0808 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 19 Mar 2025 13:43:27 +0100 Subject: [PATCH 3/3] add tests Signed-off-by: Gabriele Santomaggio --- Tests/Amqp10Tests.cs | 25 +++++++++++++++++++++++++ Tests/Resources/shovel_annotations | Bin 0 -> 312 bytes 2 files changed, 25 insertions(+) create mode 100644 Tests/Resources/shovel_annotations diff --git a/Tests/Amqp10Tests.cs b/Tests/Amqp10Tests.cs index 633f7f56..9691c5b1 100644 --- a/Tests/Amqp10Tests.cs +++ b/Tests/Amqp10Tests.cs @@ -4,6 +4,7 @@ using System; using System.Buffers; +using System.Collections.Generic; using System.Linq; using System.Text; using RabbitMQ.Stream.Client; @@ -444,6 +445,30 @@ public void ValidateUuidMessagesFromGo() Assert.Equal(uuid_value, uuid_message.Properties.CorrelationId); } + [Fact] + public void ValidateAnnotationMap() + { + // shovel_annotations is a message with a map annotation + // coming from the Go client and the following configuration: + // source queue: "form" + // destination exchange: "to" + // queue bound to the exchange + // shovel from the source queue to the destination exchange + // the annotations will be added to the message + + var buffer = SystemUtils.GetFileContent("shovel_annotations"); + var reader = new SequenceReader(new ReadOnlySequence(buffer)); + var shovelAnnotation = Message.From(ref reader, (uint)reader.Length); + Assert.NotNull(shovelAnnotation); + Assert.NotNull(shovelAnnotation.Annotations["x-shovelled"]); + var xShovelled = shovelAnnotation.Annotations["x-shovelled"] as Dictionary; + Assert.NotNull(xShovelled); + Assert.Equal("hello-key", xShovelled["dest-exchange-key"]); + Assert.Equal("from", xShovelled["src-queue"]); + Assert.Equal("to", xShovelled["dest-exchange"]); + Assert.Equal("dynamic", xShovelled["shovel-type"]); + } + [Fact] public void ValidateNilMessagesFromGo() { diff --git a/Tests/Resources/shovel_annotations b/Tests/Resources/shovel_annotations new file mode 100644 index 0000000000000000000000000000000000000000..abfeed8c62fc1084a679afa36af8302c21f103b7 GIT binary patch literal 312 zcmZ8cQEq}j5FJ`0Dwo8)noA=_a` zfCWTf-mA$+J Mk^c3pWqHqh1?Hh^WdHyG literal 0 HcmV?d00001