Skip to content

Commit d6d11a2

Browse files
Merge pull request #13732 from rabbitmq/mergify/bp/v4.0.x/pr-13729
Fix concurrent AMQP queue declarations (backport #13727) (backport #13729)
2 parents 268afdd + fe06714 commit d6d11a2

File tree

2 files changed

+50
-12
lines changed

2 files changed

+50
-12
lines changed

deps/rabbit/src/rabbit_amqp_management.erl

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -127,15 +127,15 @@ handle_http_req(HttpMethod = <<"PUT">>,
127127
PermCache1 = check_resource_access(QName, configure, User, PermCache0),
128128
rabbit_core_metrics:queue_declared(QName),
129129

130-
{Q1, NumMsgs, NumConsumers, StatusCode, PermCache} =
131130
case rabbit_amqqueue:with(
132131
QName,
133132
fun(Q) ->
134133
try rabbit_amqqueue:assert_equivalence(
135134
Q, Durable, AutoDelete, QArgs, Owner) of
136135
ok ->
137136
{ok, Msgs, Consumers} = rabbit_amqqueue:stat(Q),
138-
{ok, {Q, Msgs, Consumers, <<"200">>, PermCache1}}
137+
RespPayload = encode_queue(Q, Msgs, Consumers),
138+
{ok, {<<"200">>, RespPayload, {PermCache1, TopicPermCache}}}
139139
catch exit:#amqp_error{name = precondition_failed,
140140
explanation = Expl} ->
141141
throw(<<"409">>, Expl, []);
@@ -146,23 +146,26 @@ handle_http_req(HttpMethod = <<"PUT">>,
146146
{ok, Result} ->
147147
Result;
148148
{error, not_found} ->
149-
PermCache2 = check_dead_letter_exchange(QName, QArgs, User, PermCache1),
149+
PermCache = check_dead_letter_exchange(QName, QArgs, User, PermCache1),
150+
PermCaches = {PermCache, TopicPermCache},
150151
try rabbit_amqqueue:declare(
151152
QName, Durable, AutoDelete, QArgs, Owner, Username) of
152153
{new, Q} ->
153154
rabbit_core_metrics:queue_created(QName),
154-
{Q, 0, 0, <<"201">>, PermCache2};
155+
RespPayload = encode_queue(Q, 0, 0),
156+
{<<"201">>, RespPayload, PermCaches};
155157
{owner_died, Q} ->
156158
%% Presumably our own days are numbered since the
157159
%% connection has died. Pretend the queue exists though,
158160
%% just so nothing fails.
159-
{Q, 0, 0, <<"201">>, PermCache2};
161+
RespPayload = encode_queue(Q, 0, 0),
162+
{<<"201">>, RespPayload, PermCaches};
160163
{absent, Q, Reason} ->
161164
absent(Q, Reason);
162165
{existing, _Q} ->
163166
%% Must have been created in the meantime. Loop around again.
164167
handle_http_req(HttpMethod, PathSegments, Query, ReqPayload,
165-
Vhost, User, ConnPid, {PermCache2, TopicPermCache});
168+
Vhost, User, ConnPid, PermCaches);
166169
{error, queue_limit_exceeded, Reason, ReasonArgs} ->
167170
throw(<<"403">>,
168171
Reason,
@@ -177,10 +180,7 @@ handle_http_req(HttpMethod = <<"PUT">>,
177180
end;
178181
{error, {absent, Q, Reason}} ->
179182
absent(Q, Reason)
180-
end,
181-
182-
RespPayload = encode_queue(Q1, NumMsgs, NumConsumers),
183-
{StatusCode, RespPayload, {PermCache, TopicPermCache}};
183+
end;
184184

185185
handle_http_req(<<"PUT">>,
186186
[<<"exchanges">>, XNameBinQuoted],

deps/rabbitmq_amqp_client/test/management_SUITE.erl

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ groups() ->
5252
bad_exchange_property,
5353
bad_exchange_type,
5454
get_queue_not_found,
55+
declare_queues_concurrently,
5556
declare_queue_default_queue_type,
5657
declare_queue_empty_name,
5758
declare_queue_line_feed,
@@ -436,6 +437,40 @@ get_queue_not_found(Config) ->
436437
amqp10_msg:body(Resp)),
437438
ok = cleanup(Init).
438439

440+
declare_queues_concurrently(Config) ->
441+
NumQueues = 5,
442+
{Pid1, Ref1} = spawn_monitor(?MODULE, declare_queues, [Config, NumQueues]),
443+
{Pid2, Ref2} = spawn_monitor(?MODULE, declare_queues, [Config, NumQueues]),
444+
receive {'DOWN', Ref1, process, Pid1, Reason1} ->
445+
?assertEqual(normal, Reason1)
446+
end,
447+
receive {'DOWN', Ref2, process, Pid2, Reason2} ->
448+
?assertEqual(normal, Reason2)
449+
end,
450+
451+
?assertEqual(NumQueues, count_queues(Config)),
452+
453+
Init = {_, LinkPair} = init(Config),
454+
lists:foreach(fun(N) ->
455+
Bin = integer_to_binary(N),
456+
QName = <<"queue-", Bin/binary>>,
457+
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName)
458+
end, lists:seq(1, NumQueues)),
459+
ok = cleanup(Init).
460+
461+
declare_queues(Config, Num) ->
462+
Init = {_, LinkPair} = init(Config),
463+
ok = declare_queues0(LinkPair, Num),
464+
ok = cleanup(Init).
465+
466+
declare_queues0(_LinkPair, 0) ->
467+
ok;
468+
declare_queues0(LinkPair, Left) ->
469+
Bin = integer_to_binary(Left),
470+
QName = <<"queue-", Bin/binary>>,
471+
?assertMatch({ok, _}, rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{})),
472+
declare_queues0(LinkPair, Left - 1).
473+
439474
declare_queue_default_queue_type(Config) ->
440475
Node = get_node_config(Config, 0, nodename),
441476
Vhost = QName = atom_to_binary(?FUNCTION_NAME),
@@ -864,11 +899,11 @@ pipeline(Config) ->
864899
%% because RabbitMQ grants us 8 link credits initially.
865900
Num = 8,
866901
pipeline0(Num, LinkPair, <<"PUT">>, {map, []}),
867-
eventually(?_assertEqual(Num, rpc(Config, rabbit_amqqueue, count, [])), 200, 20),
902+
eventually(?_assertEqual(Num, count_queues(Config)), 200, 20),
868903
flush(queues_created),
869904

870905
pipeline0(Num, LinkPair, <<"DELETE">>, null),
871-
eventually(?_assertEqual(0, rpc(Config, rabbit_amqqueue, count, [])), 200, 20),
906+
eventually(?_assertEqual(0, count_queues(Config)), 200, 20),
872907
flush(queues_deleted),
873908

874909
ok = cleanup(Init).
@@ -1120,3 +1155,6 @@ gen_server_state(Pid) ->
11201155
L1 = lists:last(L0),
11211156
{data, L2} = lists:last(L1),
11221157
proplists:get_value("State", L2).
1158+
1159+
count_queues(Config) ->
1160+
rpc(Config, rabbit_amqqueue, count, []).

0 commit comments

Comments
 (0)