Skip to content

Commit 8546bcf

Browse files
authored
Merge pull request #13918 from rabbitmq/amqp-durable
Follow AMQP spec for durable field
2 parents 4f1076d + 67895da commit 8546bcf

File tree

4 files changed

+145
-37
lines changed

4 files changed

+145
-37
lines changed

deps/amqp10_client/src/amqp10_msg.erl

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -266,24 +266,25 @@ body_bin(#amqp10_msg{body = #'v1_0.amqp_value'{} = Body}) ->
266266
%% following stucture:
267267
%% {amqp10_disposition, {accepted | rejected, DeliveryTag}}
268268
-spec new(delivery_tag(), amqp10_body() | binary(), boolean()) -> amqp10_msg().
269-
new(DeliveryTag, Body, Settled) when is_binary(Body) ->
270-
#amqp10_msg{transfer = #'v1_0.transfer'{delivery_tag = {binary, DeliveryTag},
271-
settled = Settled,
272-
message_format = {uint, ?MESSAGE_FORMAT}},
273-
body = [#'v1_0.data'{content = Body}]};
269+
new(DeliveryTag, Bin, Settled) when is_binary(Bin) ->
270+
Body = [#'v1_0.data'{content = Bin}],
271+
new(DeliveryTag, Body, Settled);
274272
new(DeliveryTag, Body, Settled) -> % TODO: constrain to amqp types
275-
#amqp10_msg{transfer = #'v1_0.transfer'{delivery_tag = {binary, DeliveryTag},
276-
settled = Settled,
277-
message_format = {uint, ?MESSAGE_FORMAT}},
278-
body = Body}.
273+
#amqp10_msg{
274+
transfer = #'v1_0.transfer'{
275+
delivery_tag = {binary, DeliveryTag},
276+
settled = Settled,
277+
message_format = {uint, ?MESSAGE_FORMAT}},
278+
%% This lib is safe by default.
279+
header = #'v1_0.header'{durable = true},
280+
body = Body}.
279281

280282
%% @doc Create a new settled amqp10 message using the specified delivery tag
281283
%% and body.
282284
-spec new(delivery_tag(), amqp10_body() | binary()) -> amqp10_msg().
283285
new(DeliveryTag, Body) ->
284286
new(DeliveryTag, Body, false).
285287

286-
287288
% First 3 octets are the format
288289
% the last 1 octet is the version
289290
% See 2.8.11 in the spec

deps/rabbit/src/mc_amqp.erl

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -251,30 +251,29 @@ routing_headers(Msg, Opts) ->
251251
List = application_properties_as_simple_map(Msg, X),
252252
maps:from_list(List).
253253

254-
get_property(durable, Msg) ->
255-
case Msg of
256-
#msg_body_encoded{header = #'v1_0.header'{durable = Durable}}
257-
when is_boolean(Durable) ->
258-
Durable;
254+
get_property(durable, #msg_body_encoded{header = Header} = Msg) ->
255+
case Header of
256+
#'v1_0.header'{durable = D} when is_boolean(D) ->
257+
D;
259258
_ ->
260259
%% fallback in case the source protocol was old AMQP 0.9.1
261260
case message_annotation(<<"x-basic-delivery-mode">>, Msg, undefined) of
262-
{ubyte, 1} ->
263-
false;
261+
{ubyte, 2} ->
262+
true;
264263
_ ->
265-
true
264+
false
266265
end
267266
end;
268-
get_property(timestamp, Msg) ->
269-
case Msg of
270-
#msg_body_encoded{properties = #'v1_0.properties'{creation_time = {timestamp, Ts}}} ->
267+
get_property(timestamp, #msg_body_encoded{properties = Properties}) ->
268+
case Properties of
269+
#'v1_0.properties'{creation_time = {timestamp, Ts}} ->
271270
Ts;
272271
_ ->
273272
undefined
274273
end;
275-
get_property(ttl, Msg) ->
276-
case Msg of
277-
#msg_body_encoded{header = #'v1_0.header'{ttl = {uint, Ttl}}} ->
274+
get_property(ttl, #msg_body_encoded{header = Header} = Msg) ->
275+
case Header of
276+
#'v1_0.header'{ttl = {uint, Ttl}} ->
278277
Ttl;
279278
_ ->
280279
%% fallback in case the source protocol was AMQP 0.9.1
@@ -286,9 +285,9 @@ get_property(ttl, Msg) ->
286285
undefined
287286
end
288287
end;
289-
get_property(priority, Msg) ->
290-
case Msg of
291-
#msg_body_encoded{header = #'v1_0.header'{priority = {ubyte, Priority}}} ->
288+
get_property(priority, #msg_body_encoded{header = Header} = Msg) ->
289+
case Header of
290+
#'v1_0.header'{priority = {ubyte, Priority}} ->
292291
Priority;
293292
_ ->
294293
%% fallback in case the source protocol was AMQP 0.9.1
@@ -319,10 +318,7 @@ protocol_state(#msg_body_encoded{header = Header0,
319318
[encode(Sections), BareAndFooter];
320319
protocol_state(#v1{message_annotations = MA0,
321320
bare_and_footer = BareAndFooter}, Anns) ->
322-
Durable = case Anns of
323-
#{?ANN_DURABLE := D} -> D;
324-
_ -> true
325-
end,
321+
Durable = maps:get(?ANN_DURABLE, Anns, true),
326322
Priority = case Anns of
327323
#{?ANN_PRIORITY := P}
328324
when is_integer(P) ->
@@ -667,7 +663,9 @@ binary_part_bare_and_footer(Payload, Start) ->
667663
binary_part(Payload, Start, byte_size(Payload) - Start).
668664

669665
update_header_from_anns(undefined, Anns) ->
670-
update_header_from_anns(#'v1_0.header'{durable = true}, Anns);
666+
Durable = maps:get(?ANN_DURABLE, Anns, true),
667+
Header = #'v1_0.header'{durable = Durable},
668+
update_header_from_anns(Header, Anns);
671669
update_header_from_anns(Header, Anns) ->
672670
DeliveryCount = case Anns of
673671
#{delivery_count := C} -> C;

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 99 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ groups() ->
5858
sender_settle_mode_unsettled,
5959
sender_settle_mode_unsettled_fanout,
6060
sender_settle_mode_mixed,
61+
durable_field_classic_queue,
62+
durable_field_quorum_queue,
63+
durable_field_stream,
6164
invalid_transfer_settled_flag,
6265
quorum_queue_rejects,
6366
receiver_settle_mode_first,
@@ -916,6 +919,77 @@ sender_settle_mode_mixed(Config) ->
916919
rabbitmq_amqp_client:delete_queue(LinkPair, QName)),
917920
ok = close(Init).
918921

922+
durable_field_classic_queue(Config) ->
923+
QName = atom_to_binary(?FUNCTION_NAME),
924+
durable_field(Config, <<"classic">>, QName).
925+
926+
durable_field_quorum_queue(Config) ->
927+
QName = atom_to_binary(?FUNCTION_NAME),
928+
durable_field(Config, <<"quorum">>, QName).
929+
930+
durable_field_stream(Config) ->
931+
QName = atom_to_binary(?FUNCTION_NAME),
932+
durable_field(Config, <<"stream">>, QName).
933+
934+
durable_field(Config, QueueType, QName)
935+
when is_binary(QueueType) ->
936+
Address = rabbitmq_amqp_address:queue(QName),
937+
{_Connection, Session, LinkPair} = Init = init(Config),
938+
QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QueueType}}},
939+
{ok, #{type := QueueType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps),
940+
{ok, Sender} = amqp10_client:attach_sender_link(
941+
Session, <<"test-sender">>, Address, unsettled),
942+
wait_for_credit(Sender),
943+
944+
ok = amqp10_client:send_msg(Sender,
945+
amqp10_msg:set_headers(
946+
#{durable => true},
947+
amqp10_msg:new(<<"t1">>, <<"durable">>))),
948+
ok = amqp10_client:send_msg(Sender,
949+
amqp10_msg:set_headers(
950+
#{durable => false},
951+
amqp10_msg:new(<<"t2">>, <<"non-durable">>))),
952+
%% Even though the AMQP spec defines durable=false as default
953+
%% (i.e. durable is false if the field is omitted on the wire),
954+
%% we expect our AMQP Erlang library to be safe by default,
955+
%% and therefore send the message as durable=true on behalf of us.
956+
ok = amqp10_client:send_msg(
957+
Sender, amqp10_msg:new(<<"t3">>, <<"lib publishes as durable by default">>)),
958+
%% When we expliclitly publish without a header section, RabbitMQ should interpret
959+
%% durable as false according to the AMQP spec.
960+
ok = amqp10_client:send_msg(
961+
Sender,
962+
amqp10_msg:from_amqp_records(
963+
[#'v1_0.transfer'{delivery_tag = {binary, <<"t4">>},
964+
settled = false,
965+
message_format = {uint, 0}},
966+
#'v1_0.data'{content = <<"publish without header section">>}])),
967+
968+
ok = wait_for_accepts(4),
969+
ok = detach_link_sync(Sender),
970+
flush(sent),
971+
972+
Filter = consume_from_first(QueueType),
973+
{ok, Receiver} = amqp10_client:attach_receiver_link(
974+
Session, <<"test-receiver">>, Address, unsettled,
975+
none, Filter),
976+
977+
ok = amqp10_client:flow_link_credit(Receiver, 4, never),
978+
[M1, M2, M3, M4] = Msgs = receive_messages(Receiver, 4),
979+
?assertEqual(<<"durable">>, amqp10_msg:body_bin(M1)),
980+
?assertMatch(#{durable := true}, amqp10_msg:headers(M1)),
981+
?assertEqual(<<"non-durable">>, amqp10_msg:body_bin(M2)),
982+
?assertMatch(#{durable := false}, amqp10_msg:headers(M2)),
983+
?assertEqual(<<"lib publishes as durable by default">>, amqp10_msg:body_bin(M3)),
984+
?assertMatch(#{durable := true}, amqp10_msg:headers(M3)),
985+
?assertEqual(<<"publish without header section">>, amqp10_msg:body_bin(M4)),
986+
?assertMatch(#{durable := false}, amqp10_msg:headers(M4)),
987+
[ok = amqp10_client:accept_msg(Receiver, M) || M <- Msgs],
988+
989+
ok = detach_link_sync(Receiver),
990+
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
991+
close(Init).
992+
919993
invalid_transfer_settled_flag(Config) ->
920994
OpnConf = connection_config(Config),
921995
{ok, Connection} = amqp10_client:open_connection(OpnConf),
@@ -1301,7 +1375,7 @@ amqp_amqpl(QType, Config) ->
13011375
Body6 = [#'v1_0.data'{content = <<0, 1>>},
13021376
#'v1_0.data'{content = <<2, 3>>}],
13031377

1304-
%% Send only body sections
1378+
%% Send only header and body sections
13051379
[ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<>>, Body, true)) ||
13061380
Body <- [Body1, Body2, Body3, Body4, Body5, Body6]],
13071381
%% Send with application-properties
@@ -1342,6 +1416,11 @@ amqp_amqpl(QType, Config) ->
13421416
#{<<"x-array">> => {array, utf8, [{utf8, <<"e1">>},
13431417
{utf8, <<"e2">>}]}},
13441418
amqp10_msg:new(<<>>, Body1, true))),
1419+
ok = amqp10_client:send_msg(
1420+
Sender,
1421+
amqp10_msg:set_headers(
1422+
#{durable => false},
1423+
amqp10_msg:new(<<>>, Body1, true))),
13451424

13461425
ok = amqp10_client:detach_link(Sender),
13471426
flush(detached),
@@ -1365,8 +1444,10 @@ amqp_amqpl(QType, Config) ->
13651444
receive {#'basic.deliver'{consumer_tag = CTag,
13661445
redelivered = false},
13671446
#amqp_msg{payload = Payload1,
1368-
props = #'P_basic'{type = <<"amqp-1.0">>}}} ->
1369-
?assertEqual([Body1], amqp10_framing:decode_bin(Payload1))
1447+
props = #'P_basic'{delivery_mode = DelMode2,
1448+
type = <<"amqp-1.0">>}}} ->
1449+
?assertEqual([Body1], amqp10_framing:decode_bin(Payload1)),
1450+
?assertEqual(2, DelMode2)
13701451
after 30000 -> ct:fail({missing_deliver, ?LINE})
13711452
end,
13721453
receive {_, #amqp_msg{payload = Payload2,
@@ -1428,6 +1509,12 @@ amqp_amqpl(QType, Config) ->
14281509
rabbit_misc:table_lookup(Headers11, <<"x-array">>))
14291510
after 30000 -> ct:fail({missing_deliver, ?LINE})
14301511
end,
1512+
receive {_, #amqp_msg{payload = Payload12,
1513+
props = #'P_basic'{delivery_mode = DelMode1}}} ->
1514+
?assertEqual([Body1], amqp10_framing:decode_bin(Payload12)),
1515+
?assertNotEqual(2, DelMode1)
1516+
after 30000 -> ct:fail({missing_deliver, ?LINE})
1517+
end,
14311518

14321519
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
14331520
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
@@ -1514,10 +1601,17 @@ amqp091_to_amqp10_header_conversion(Session, Ch, QName, Address) ->
15141601
amqp_channel:cast(
15151602
Ch,
15161603
#'basic.publish'{routing_key = QName},
1517-
#amqp_msg{props = #'P_basic'{headers = Amqp091Headers},
1604+
#amqp_msg{props = #'P_basic'{delivery_mode = 2,
1605+
priority = 5,
1606+
headers = Amqp091Headers},
15181607
payload = <<"foobar">>}),
15191608

15201609
{ok, [Msg]} = drain_queue(Session, Address, 1),
1610+
1611+
?assertMatch(#{durable := true,
1612+
priority := 5},
1613+
amqp10_msg:headers(Msg)),
1614+
15211615
Amqp10MA = amqp10_msg:message_annotations(Msg),
15221616
?assertEqual(<<"my-string">>, maps:get(<<"x-string">>, Amqp10MA, undefined)),
15231617
?assertEqual(92, maps:get(<<"x-int">>, Amqp10MA, undefined)),
@@ -3278,7 +3372,7 @@ max_message_size_client_to_server(Config) ->
32783372
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address, mixed),
32793373
ok = wait_for_credit(Sender),
32803374

3281-
PayloadSmallEnough = binary:copy(<<0>>, MaxMessageSize - 10),
3375+
PayloadSmallEnough = binary:copy(<<0>>, MaxMessageSize - 20),
32823376
?assertEqual(ok,
32833377
amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t1">>, PayloadSmallEnough, false))),
32843378
ok = wait_for_accepted(<<"t1">>),

release-notes/4.2.0.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,21 @@
33
RabbitMQ 4.2.0 is a new feature release.
44

55

6+
## Breaking Changes and Compatibility Notes
7+
8+
### Default value for AMQP 1.0 `durable` field.
9+
10+
Starting with RabbitMQ 4.2, if a sending client omits the header section, RabbitMQ [assumes](https://github.com/rabbitmq/rabbitmq-server/pull/13918) the `durable` field to be `false` complying with the AMQP 1.0 spec:
11+
```
12+
<field name="durable" type="boolean" default="false"/>
13+
```
14+
15+
AMQP 1.0 apps or client libraries must set the `durable` field of the header section to `true` to mark the message as durable.
16+
17+
Team RabbitMQ recommends client libraries to send messages as durable by default.
18+
All AMQP 1.0 client libraries [maintained by Team RabbitMQ](https://www.rabbitmq.com/client-libraries/amqp-client-libraries) send messages as durable by default.
19+
20+
621
## Features
722

823
### Incoming and Outgoing Message Interceptors for native protocols

0 commit comments

Comments
 (0)