Skip to content

Commit be064ff

Browse files
committed
Follow AMQP spec for durable field
The AMQP spec defines: ``` <field name="durable" type="boolean" default="false"/> ``` RabbitMQ 4.0 and 4.1 interpret the durable field as true if not set. The idea was to favour safety over performance. This complies with the AMQP spec because the spec allows other target or node specific defaults for the durable field: > If the header section is omitted the receiver MUST assume the appropriate > default values (or the meaning implied by no value being set) for the fields > within the header unless other target or node specific defaults have otherwise > been set. However, some client libraries completely omit the header section if the app expliclity sets durable=false. This complies with the spec, but it means that RabbitMQ cannot diffentiate between "client app forgot to set the durable field" vs "client lib opted in for an optimisation omitting the header section". This is problematic with JMS message selectors where JMS apps can filter on JMSDeliveryMode. To be able to correctly filter on JMSDeliveryMode, RabbitMQ needs to know whether the JMS app sent the message as PERSISTENT or NON_PERSISTENT. Rather than relying on client libs to always send the header section including the durable field, this commit makes RabbitMQ comply with the default value for durable in the AMQP spec. Some client lib maintainers accepted to send the header section, while other maintainers refused to do so: Azure/go-amqp#330 https://issues.apache.org/jira/browse/QPIDJMS-608 Likely the AMQP spec was designed to omit the header section when performance is important, as is the case with durable=false. Omitting the header section means saving a few bytes per message on the wire and some marshalling and unmarshalling overhead on both client and server. Therefore, it's better to push the "safe by default" behaviour from the broker back to the client libs. Client libs should send messages as durable by default unless the client app expliclity opts in to send messages as non-durable. This is also what JMS does: By default JMS apps send messages as PERSISTENT: > The message producer's default delivery mode is PERSISTENT. Therefore, this commit also makes the AMQP Erlang client send messages as durable, by default. This commit will apply to RabbitMQ 4.2. It's arguably not a breaking change because in RabbitMQ, message durability is actually more determined by the queue type the message is sent to rather than the durable field of the message: * Quroum queues and streams store messages durably (fsync or replicate) no matter what the durable field is * MQTT QoS 0 queues hold messages in memory no matter what the durable field is * Classic queues do not fsync even if the durable field is set to true In addition, the RabbitMQ AMQP Java library introduced in RabbitMQ 4.0 sends messages with durable=true: https://github.com/rabbitmq/rabbitmq-amqp-java-client/blob/53e3dd6abbcbce8ca4f2257da56b314786b037cc/src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java#L91 The tests for selecting messages by JMSDeliveryMode relying on the behaviour in this commit can be found on the `jms` branch.
1 parent 4f1076d commit be064ff

File tree

3 files changed

+121
-26
lines changed

3 files changed

+121
-26
lines changed

deps/amqp10_client/src/amqp10_msg.erl

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -266,24 +266,25 @@ body_bin(#amqp10_msg{body = #'v1_0.amqp_value'{} = Body}) ->
266266
%% following stucture:
267267
%% {amqp10_disposition, {accepted | rejected, DeliveryTag}}
268268
-spec new(delivery_tag(), amqp10_body() | binary(), boolean()) -> amqp10_msg().
269-
new(DeliveryTag, Body, Settled) when is_binary(Body) ->
270-
#amqp10_msg{transfer = #'v1_0.transfer'{delivery_tag = {binary, DeliveryTag},
271-
settled = Settled,
272-
message_format = {uint, ?MESSAGE_FORMAT}},
273-
body = [#'v1_0.data'{content = Body}]};
269+
new(DeliveryTag, Bin, Settled) when is_binary(Bin) ->
270+
Body = [#'v1_0.data'{content = Bin}],
271+
new(DeliveryTag, Body, Settled);
274272
new(DeliveryTag, Body, Settled) -> % TODO: constrain to amqp types
275-
#amqp10_msg{transfer = #'v1_0.transfer'{delivery_tag = {binary, DeliveryTag},
276-
settled = Settled,
277-
message_format = {uint, ?MESSAGE_FORMAT}},
278-
body = Body}.
273+
#amqp10_msg{
274+
transfer = #'v1_0.transfer'{
275+
delivery_tag = {binary, DeliveryTag},
276+
settled = Settled,
277+
message_format = {uint, ?MESSAGE_FORMAT}},
278+
%% This lib is safe by default.
279+
header = #'v1_0.header'{durable = true},
280+
body = Body}.
279281

280282
%% @doc Create a new settled amqp10 message using the specified delivery tag
281283
%% and body.
282284
-spec new(delivery_tag(), amqp10_body() | binary()) -> amqp10_msg().
283285
new(DeliveryTag, Body) ->
284286
new(DeliveryTag, Body, false).
285287

286-
287288
% First 3 octets are the format
288289
% the last 1 octet is the version
289290
% See 2.8.11 in the spec

deps/rabbit/src/mc_amqp.erl

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -259,10 +259,10 @@ get_property(durable, Msg) ->
259259
_ ->
260260
%% fallback in case the source protocol was old AMQP 0.9.1
261261
case message_annotation(<<"x-basic-delivery-mode">>, Msg, undefined) of
262-
{ubyte, 1} ->
263-
false;
262+
{ubyte, 2} ->
263+
true;
264264
_ ->
265-
true
265+
false
266266
end
267267
end;
268268
get_property(timestamp, Msg) ->
@@ -319,10 +319,6 @@ protocol_state(#msg_body_encoded{header = Header0,
319319
[encode(Sections), BareAndFooter];
320320
protocol_state(#v1{message_annotations = MA0,
321321
bare_and_footer = BareAndFooter}, Anns) ->
322-
Durable = case Anns of
323-
#{?ANN_DURABLE := D} -> D;
324-
_ -> true
325-
end,
326322
Priority = case Anns of
327323
#{?ANN_PRIORITY := P}
328324
when is_integer(P) ->
@@ -337,8 +333,7 @@ protocol_state(#v1{message_annotations = MA0,
337333
_ ->
338334
undefined
339335
end,
340-
Header = update_header_from_anns(#'v1_0.header'{durable = Durable,
341-
priority = Priority,
336+
Header = update_header_from_anns(#'v1_0.header'{priority = Priority,
342337
ttl = Ttl}, Anns),
343338
MA = protocol_state_message_annotations(MA0, Anns),
344339
Sections = to_sections(Header, MA, []),
@@ -667,8 +662,12 @@ binary_part_bare_and_footer(Payload, Start) ->
667662
binary_part(Payload, Start, byte_size(Payload) - Start).
668663

669664
update_header_from_anns(undefined, Anns) ->
670-
update_header_from_anns(#'v1_0.header'{durable = true}, Anns);
665+
update_header_from_anns(#'v1_0.header'{}, Anns);
671666
update_header_from_anns(Header, Anns) ->
667+
Durable = case Anns of
668+
#{?ANN_DURABLE := D} -> D;
669+
_ -> true
670+
end,
672671
DeliveryCount = case Anns of
673672
#{delivery_count := C} -> C;
674673
_ -> 0
@@ -680,7 +679,8 @@ update_header_from_anns(Header, Anns) ->
680679
FirstAcq = not Redelivered andalso
681680
DeliveryCount =:= 0 andalso
682681
not is_map_key(deaths, Anns),
683-
Header#'v1_0.header'{first_acquirer = FirstAcq,
682+
Header#'v1_0.header'{durable = Durable,
683+
first_acquirer = FirstAcq,
684684
delivery_count = {uint, DeliveryCount}}.
685685

686686
encode_deaths(Deaths) ->

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 99 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ groups() ->
5858
sender_settle_mode_unsettled,
5959
sender_settle_mode_unsettled_fanout,
6060
sender_settle_mode_mixed,
61+
durable_field_classic_queue,
62+
durable_field_quorum_queue,
63+
durable_field_stream,
6164
invalid_transfer_settled_flag,
6265
quorum_queue_rejects,
6366
receiver_settle_mode_first,
@@ -916,6 +919,77 @@ sender_settle_mode_mixed(Config) ->
916919
rabbitmq_amqp_client:delete_queue(LinkPair, QName)),
917920
ok = close(Init).
918921

922+
durable_field_classic_queue(Config) ->
923+
QName = atom_to_binary(?FUNCTION_NAME),
924+
durable_field(Config, <<"classic">>, QName).
925+
926+
durable_field_quorum_queue(Config) ->
927+
QName = atom_to_binary(?FUNCTION_NAME),
928+
durable_field(Config, <<"quorum">>, QName).
929+
930+
durable_field_stream(Config) ->
931+
QName = atom_to_binary(?FUNCTION_NAME),
932+
durable_field(Config, <<"stream">>, QName).
933+
934+
durable_field(Config, QueueType, QName)
935+
when is_binary(QueueType) ->
936+
Address = rabbitmq_amqp_address:queue(QName),
937+
{_Connection, Session, LinkPair} = Init = init(Config),
938+
QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QueueType}}},
939+
{ok, #{type := QueueType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps),
940+
{ok, Sender} = amqp10_client:attach_sender_link(
941+
Session, <<"test-sender">>, Address, unsettled),
942+
wait_for_credit(Sender),
943+
944+
ok = amqp10_client:send_msg(Sender,
945+
amqp10_msg:set_headers(
946+
#{durable => true},
947+
amqp10_msg:new(<<"t1">>, <<"durable">>))),
948+
ok = amqp10_client:send_msg(Sender,
949+
amqp10_msg:set_headers(
950+
#{durable => false},
951+
amqp10_msg:new(<<"t2">>, <<"non-durable">>))),
952+
%% Even though the AMQP spec defines durable=false as default
953+
%% (i.e. durable is false if the field is omitted on the wire),
954+
%% we expect our AMQP Erlang library to be safe by default,
955+
%% and therefore send the message as durable=true on behalf of us.
956+
ok = amqp10_client:send_msg(
957+
Sender, amqp10_msg:new(<<"t3">>, <<"lib publishes as durable by default">>)),
958+
%% When we expliclitly publish without a header section, RabbitMQ should interpret
959+
%% durable as false according to the AMQP spec.
960+
ok = amqp10_client:send_msg(
961+
Sender,
962+
amqp10_msg:from_amqp_records(
963+
[#'v1_0.transfer'{delivery_tag = {binary, <<"t4">>},
964+
settled = false,
965+
message_format = {uint, 0}},
966+
#'v1_0.data'{content = <<"publish without header section">>}])),
967+
968+
ok = wait_for_accepts(4),
969+
ok = detach_link_sync(Sender),
970+
flush(sent),
971+
972+
Filter = consume_from_first(QueueType),
973+
{ok, Receiver} = amqp10_client:attach_receiver_link(
974+
Session, <<"test-receiver">>, Address, unsettled,
975+
none, Filter),
976+
977+
ok = amqp10_client:flow_link_credit(Receiver, 4, never),
978+
[M1, M2, M3, M4] = Msgs = receive_messages(Receiver, 4),
979+
?assertEqual(<<"durable">>, amqp10_msg:body_bin(M1)),
980+
?assertMatch(#{durable := true}, amqp10_msg:headers(M1)),
981+
?assertEqual(<<"non-durable">>, amqp10_msg:body_bin(M2)),
982+
?assertMatch(#{durable := false}, amqp10_msg:headers(M2)),
983+
?assertEqual(<<"lib publishes as durable by default">>, amqp10_msg:body_bin(M3)),
984+
?assertMatch(#{durable := true}, amqp10_msg:headers(M3)),
985+
?assertEqual(<<"publish without header section">>, amqp10_msg:body_bin(M4)),
986+
?assertMatch(#{durable := false}, amqp10_msg:headers(M4)),
987+
[ok = amqp10_client:accept_msg(Receiver, M) || M <- Msgs],
988+
989+
ok = detach_link_sync(Receiver),
990+
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
991+
close(Init).
992+
919993
invalid_transfer_settled_flag(Config) ->
920994
OpnConf = connection_config(Config),
921995
{ok, Connection} = amqp10_client:open_connection(OpnConf),
@@ -1301,7 +1375,7 @@ amqp_amqpl(QType, Config) ->
13011375
Body6 = [#'v1_0.data'{content = <<0, 1>>},
13021376
#'v1_0.data'{content = <<2, 3>>}],
13031377

1304-
%% Send only body sections
1378+
%% Send only header and body sections
13051379
[ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<>>, Body, true)) ||
13061380
Body <- [Body1, Body2, Body3, Body4, Body5, Body6]],
13071381
%% Send with application-properties
@@ -1342,6 +1416,11 @@ amqp_amqpl(QType, Config) ->
13421416
#{<<"x-array">> => {array, utf8, [{utf8, <<"e1">>},
13431417
{utf8, <<"e2">>}]}},
13441418
amqp10_msg:new(<<>>, Body1, true))),
1419+
ok = amqp10_client:send_msg(
1420+
Sender,
1421+
amqp10_msg:set_headers(
1422+
#{durable => false},
1423+
amqp10_msg:new(<<>>, Body1, true))),
13451424

13461425
ok = amqp10_client:detach_link(Sender),
13471426
flush(detached),
@@ -1365,8 +1444,10 @@ amqp_amqpl(QType, Config) ->
13651444
receive {#'basic.deliver'{consumer_tag = CTag,
13661445
redelivered = false},
13671446
#amqp_msg{payload = Payload1,
1368-
props = #'P_basic'{type = <<"amqp-1.0">>}}} ->
1369-
?assertEqual([Body1], amqp10_framing:decode_bin(Payload1))
1447+
props = #'P_basic'{delivery_mode = DelMode2,
1448+
type = <<"amqp-1.0">>}}} ->
1449+
?assertEqual([Body1], amqp10_framing:decode_bin(Payload1)),
1450+
?assertEqual(2, DelMode2)
13701451
after 30000 -> ct:fail({missing_deliver, ?LINE})
13711452
end,
13721453
receive {_, #amqp_msg{payload = Payload2,
@@ -1428,6 +1509,12 @@ amqp_amqpl(QType, Config) ->
14281509
rabbit_misc:table_lookup(Headers11, <<"x-array">>))
14291510
after 30000 -> ct:fail({missing_deliver, ?LINE})
14301511
end,
1512+
receive {_, #amqp_msg{payload = Payload12,
1513+
props = #'P_basic'{delivery_mode = DelMode1}}} ->
1514+
?assertEqual([Body1], amqp10_framing:decode_bin(Payload12)),
1515+
?assertNotEqual(2, DelMode1)
1516+
after 30000 -> ct:fail({missing_deliver, ?LINE})
1517+
end,
14311518

14321519
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
14331520
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
@@ -1514,10 +1601,17 @@ amqp091_to_amqp10_header_conversion(Session, Ch, QName, Address) ->
15141601
amqp_channel:cast(
15151602
Ch,
15161603
#'basic.publish'{routing_key = QName},
1517-
#amqp_msg{props = #'P_basic'{headers = Amqp091Headers},
1604+
#amqp_msg{props = #'P_basic'{delivery_mode = 2,
1605+
priority = 5,
1606+
headers = Amqp091Headers},
15181607
payload = <<"foobar">>}),
15191608

15201609
{ok, [Msg]} = drain_queue(Session, Address, 1),
1610+
1611+
?assertMatch(#{durable := true,
1612+
priority := 5},
1613+
amqp10_msg:headers(Msg)),
1614+
15211615
Amqp10MA = amqp10_msg:message_annotations(Msg),
15221616
?assertEqual(<<"my-string">>, maps:get(<<"x-string">>, Amqp10MA, undefined)),
15231617
?assertEqual(92, maps:get(<<"x-int">>, Amqp10MA, undefined)),
@@ -3278,7 +3372,7 @@ max_message_size_client_to_server(Config) ->
32783372
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address, mixed),
32793373
ok = wait_for_credit(Sender),
32803374

3281-
PayloadSmallEnough = binary:copy(<<0>>, MaxMessageSize - 10),
3375+
PayloadSmallEnough = binary:copy(<<0>>, MaxMessageSize - 20),
32823376
?assertEqual(ok,
32833377
amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t1">>, PayloadSmallEnough, false))),
32843378
ok = wait_for_accepted(<<"t1">>),

0 commit comments

Comments
 (0)