Skip to content

Commit 606ae95

Browse files
committed
Make Erlang AMQP library safe by default
By default we want AMQP libraries to send messages as durable. The app can always choose to opt in to send the message as non-durable.
1 parent b5ad5c2 commit 606ae95

File tree

2 files changed

+80
-11
lines changed

2 files changed

+80
-11
lines changed

deps/amqp10_client/src/amqp10_msg.erl

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -266,24 +266,25 @@ body_bin(#amqp10_msg{body = #'v1_0.amqp_value'{} = Body}) ->
266266
%% following stucture:
267267
%% {amqp10_disposition, {accepted | rejected, DeliveryTag}}
268268
-spec new(delivery_tag(), amqp10_body() | binary(), boolean()) -> amqp10_msg().
269-
new(DeliveryTag, Body, Settled) when is_binary(Body) ->
270-
#amqp10_msg{transfer = #'v1_0.transfer'{delivery_tag = {binary, DeliveryTag},
271-
settled = Settled,
272-
message_format = {uint, ?MESSAGE_FORMAT}},
273-
body = [#'v1_0.data'{content = Body}]};
269+
new(DeliveryTag, Bin, Settled) when is_binary(Bin) ->
270+
Body = [#'v1_0.data'{content = Bin}],
271+
new(DeliveryTag, Body, Settled);
274272
new(DeliveryTag, Body, Settled) -> % TODO: constrain to amqp types
275-
#amqp10_msg{transfer = #'v1_0.transfer'{delivery_tag = {binary, DeliveryTag},
276-
settled = Settled,
277-
message_format = {uint, ?MESSAGE_FORMAT}},
278-
body = Body}.
273+
#amqp10_msg{
274+
transfer = #'v1_0.transfer'{
275+
delivery_tag = {binary, DeliveryTag},
276+
settled = Settled,
277+
message_format = {uint, ?MESSAGE_FORMAT}},
278+
%% This lib is safe by default.
279+
header = #'v1_0.header'{durable = true},
280+
body = Body}.
279281

280282
%% @doc Create a new settled amqp10 message using the specified delivery tag
281283
%% and body.
282284
-spec new(delivery_tag(), amqp10_body() | binary()) -> amqp10_msg().
283285
new(DeliveryTag, Body) ->
284286
new(DeliveryTag, Body, false).
285287

286-
287288
% First 3 octets are the format
288289
% the last 1 octet is the version
289290
% See 2.8.11 in the spec

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ groups() ->
5858
sender_settle_mode_unsettled,
5959
sender_settle_mode_unsettled_fanout,
6060
sender_settle_mode_mixed,
61+
durable_field_classic_queue,
62+
durable_field_quorum_queue,
63+
durable_field_stream,
6164
invalid_transfer_settled_flag,
6265
quorum_queue_rejects,
6366
receiver_settle_mode_first,
@@ -916,6 +919,71 @@ sender_settle_mode_mixed(Config) ->
916919
rabbitmq_amqp_client:delete_queue(LinkPair, QName)),
917920
ok = close(Init).
918921

922+
durable_field_classic_queue(Config) ->
923+
QName = atom_to_binary(?FUNCTION_NAME),
924+
durable_field(Config, <<"classic">>, QName).
925+
926+
durable_field_quorum_queue(Config) ->
927+
QName = atom_to_binary(?FUNCTION_NAME),
928+
durable_field(Config, <<"quorum">>, QName).
929+
930+
durable_field_stream(Config) ->
931+
QName = atom_to_binary(?FUNCTION_NAME),
932+
durable_field(Config, <<"stream">>, QName).
933+
934+
durable_field(Config, QueueType, QName)
935+
when is_binary(QueueType) ->
936+
Address = rabbitmq_amqp_address:queue(QName),
937+
{_Connection, Session, LinkPair} = Init = init(Config),
938+
QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QueueType}}},
939+
{ok, #{type := QueueType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps),
940+
{ok, Sender} = amqp10_client:attach_sender_link(
941+
Session, <<"test-sender">>, Address, unsettled),
942+
wait_for_credit(Sender),
943+
944+
ok = amqp10_client:send_msg(
945+
Sender,
946+
amqp10_msg:set_headers(
947+
#{durable => true},
948+
amqp10_msg:new(<<"t1">>, <<"durable">>))),
949+
ok = amqp10_client:send_msg(
950+
Sender,
951+
amqp10_msg:set_headers(
952+
#{durable => false},
953+
amqp10_msg:new(<<"t2">>, <<"non-durable">>))),
954+
%% Even though the AMQP spec defines durable=false as default
955+
%% (i.e. durable is false if the field is omitted on the wire),
956+
%% we expect our AMQP Erlang library to be safe by default,
957+
%% and therefore send the message as durable=true on behalf of us.
958+
ok = amqp10_client:send_msg(
959+
Sender, amqp10_msg:new(<<"t3">>, <<"lib sends as durable by default">>)),
960+
961+
ok = wait_for_accepts(3),
962+
ok = detach_link_sync(Sender),
963+
flush(sent),
964+
965+
Filter = consume_from_first(QueueType),
966+
{ok, Receiver} = amqp10_client:attach_receiver_link(
967+
Session, <<"test-receiver">>, Address, unsettled,
968+
none, Filter),
969+
970+
ok = amqp10_client:flow_link_credit(Receiver, 3, never),
971+
[M1, M2, M3] = receive_messages(Receiver, 3),
972+
?assertEqual(<<"durable">>, amqp10_msg:body_bin(M1)),
973+
?assertMatch(#{durable := true}, amqp10_msg:headers(M1)),
974+
?assertEqual(<<"non-durable">>, amqp10_msg:body_bin(M2)),
975+
?assertMatch(#{durable := false}, amqp10_msg:headers(M2)),
976+
?assertEqual(<<"lib sends as durable by default">>, amqp10_msg:body_bin(M3)),
977+
?assertMatch(#{durable := true}, amqp10_msg:headers(M3)),
978+
979+
ok = amqp10_client:accept_msg(Receiver, M1),
980+
ok = amqp10_client:accept_msg(Receiver, M2),
981+
ok = amqp10_client:accept_msg(Receiver, M3),
982+
983+
ok = detach_link_sync(Receiver),
984+
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
985+
close(Init).
986+
919987
invalid_transfer_settled_flag(Config) ->
920988
OpnConf = connection_config(Config),
921989
{ok, Connection} = amqp10_client:open_connection(OpnConf),
@@ -3278,7 +3346,7 @@ max_message_size_client_to_server(Config) ->
32783346
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address, mixed),
32793347
ok = wait_for_credit(Sender),
32803348

3281-
PayloadSmallEnough = binary:copy(<<0>>, MaxMessageSize - 10),
3349+
PayloadSmallEnough = binary:copy(<<0>>, MaxMessageSize - 20),
32823350
?assertEqual(ok,
32833351
amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t1">>, PayloadSmallEnough, false))),
32843352
ok = wait_for_accepted(<<"t1">>),

0 commit comments

Comments
 (0)