Skip to content

Commit 6eb1f87

Browse files
authored
Fix concurrent AMQP queue declarations (#13727)
* Fix concurrent AMQP queue declarations Prior to this commit, when AMQP clients declared the same queues concurrently, the following crash occurred: ``` │ *Error{Condition: amqp:internal-error, Description: {badmatch,{<<"200">>, │ {map,[{{utf8,<<"leader">>},{utf8,<<"rabbit-2@carrot">>}}, │ {{utf8,<<"message_count">>},{ulong,0}}, │ {{utf8,<<"consumer_count">>},{uint,0}}, │ {{utf8,<<"name">>},{utf8,<<"cq-145">>}}, │ {{utf8,<<"vhost">>},{utf8,<<"/">>}}, │ {{utf8,<<"durable">>},{boolean,true}}, │ {{utf8,<<"auto_delete">>},{boolean,false}}, │ {{utf8,<<"exclusive">>},{boolean,false}}, │ {{utf8,<<"type">>},{utf8,<<"classic">>}}, │ {{utf8,<<"arguments">>}, │ {map,[{{utf8,<<"x-queue-type">>},{utf8,<<"classic">>}}]}}, │ {{utf8,<<"replicas">>}, │ {array,utf8,[{utf8,<<"rabbit-2@carrot">>}]}}]}, │ {[{{resource,<<"/">>,queue,<<"cq-145">>},configure}, │ {{resource,<<"/">>,queue,<<"cq-144">>},configure}, │ {{resource,<<"/">>,queue,<<"cq-143">>},configure}, │ {{resource,<<"/">>,queue,<<"cq-142">>},configure}, │ {{resource,<<"/">>,queue,<<"cq-141">>},configure}, │ {{resource,<<"/">>,queue,<<"cq-140">>},configure}, │ {{resource,<<"/">>,queue,<<"cq-139">>},configure}, │ {{resource,<<"/">>,queue,<<"cq-138">>},configure}, │ {{resource,<<"/">>,queue,<<"cq-137">>},configure}, │ {{resource,<<"/">>,queue,<<"cq-136">>},configure}, │ {{resource,<<"/">>,queue,<<"cq-135">>},configure}, │ {{resource,<<"/">>,queue,<<"cq-134">>},configure}], │ []}}} │ [{rabbit_amqp_management,handle_http_req,8, │ [{file,"rabbit_amqp_management.erl"},{line,130}]}, │ {rabbit_amqp_management,handle_request,5, │ [{file,"rabbit_amqp_management.erl"},{line,43}]}, │ {rabbit_amqp_session,incoming_mgmt_link_transfer,3, │ [{file,"rabbit_amqp_session.erl"},{line,2317}]}, │ {rabbit_amqp_session,handle_frame,2, │ [{file,"rabbit_amqp_session.erl"},{line,963}]}, │ {rabbit_amqp_session,handle_cast,2, │ [{file,"rabbit_amqp_session.erl"},{line,539}]}, │ {gen_server,try_handle_cast,3,[{file,"gen_server.erl"},{line,2371}]}, │ {gen_server,handle_msg,6,[{file,"gen_server.erl"},{line,2433}]}, │ {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,329}]}], Info: map[]} ``` To repro, run the following command in parallel in two separate terminals: ``` ./omq amqp -x 10000 -t /queues/cq-%d -y 0 -C 0 --queues classic classic ``` * Simplify
1 parent 4a122df commit 6eb1f87

File tree

2 files changed

+50
-12
lines changed

2 files changed

+50
-12
lines changed

deps/rabbit/src/rabbit_amqp_management.erl

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -127,15 +127,15 @@ handle_http_req(HttpMethod = <<"PUT">>,
127127
PermCache1 = check_resource_access(QName, configure, User, PermCache0),
128128
rabbit_core_metrics:queue_declared(QName),
129129

130-
{Q1, NumMsgs, NumConsumers, StatusCode, PermCache} =
131130
case rabbit_amqqueue:with(
132131
QName,
133132
fun(Q) ->
134133
try rabbit_amqqueue:assert_equivalence(
135134
Q, Durable, AutoDelete, QArgs, Owner) of
136135
ok ->
137136
{ok, Msgs, Consumers} = rabbit_amqqueue:stat(Q),
138-
{ok, {Q, Msgs, Consumers, <<"200">>, PermCache1}}
137+
RespPayload = encode_queue(Q, Msgs, Consumers),
138+
{ok, {<<"200">>, RespPayload, {PermCache1, TopicPermCache}}}
139139
catch exit:#amqp_error{name = precondition_failed,
140140
explanation = Expl} ->
141141
throw(<<"409">>, Expl, []);
@@ -146,23 +146,26 @@ handle_http_req(HttpMethod = <<"PUT">>,
146146
{ok, Result} ->
147147
Result;
148148
{error, not_found} ->
149-
PermCache2 = check_dead_letter_exchange(QName, QArgs, User, PermCache1),
149+
PermCache = check_dead_letter_exchange(QName, QArgs, User, PermCache1),
150+
PermCaches = {PermCache, TopicPermCache},
150151
try rabbit_amqqueue:declare(
151152
QName, Durable, AutoDelete, QArgs, Owner, Username) of
152153
{new, Q} ->
153154
rabbit_core_metrics:queue_created(QName),
154-
{Q, 0, 0, <<"201">>, PermCache2};
155+
RespPayload = encode_queue(Q, 0, 0),
156+
{<<"201">>, RespPayload, PermCaches};
155157
{owner_died, Q} ->
156158
%% Presumably our own days are numbered since the
157159
%% connection has died. Pretend the queue exists though,
158160
%% just so nothing fails.
159-
{Q, 0, 0, <<"201">>, PermCache2};
161+
RespPayload = encode_queue(Q, 0, 0),
162+
{<<"201">>, RespPayload, PermCaches};
160163
{absent, Q, Reason} ->
161164
absent(Q, Reason);
162165
{existing, _Q} ->
163166
%% Must have been created in the meantime. Loop around again.
164167
handle_http_req(HttpMethod, PathSegments, Query, ReqPayload,
165-
Vhost, User, ConnPid, {PermCache2, TopicPermCache});
168+
Vhost, User, ConnPid, PermCaches);
166169
{error, queue_limit_exceeded, Reason, ReasonArgs} ->
167170
throw(<<"403">>,
168171
Reason,
@@ -177,10 +180,7 @@ handle_http_req(HttpMethod = <<"PUT">>,
177180
end;
178181
{error, {absent, Q, Reason}} ->
179182
absent(Q, Reason)
180-
end,
181-
182-
RespPayload = encode_queue(Q1, NumMsgs, NumConsumers),
183-
{StatusCode, RespPayload, {PermCache, TopicPermCache}};
183+
end;
184184

185185
handle_http_req(<<"PUT">>,
186186
[<<"exchanges">>, XNameBinQuoted],

deps/rabbitmq_amqp_client/test/management_SUITE.erl

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ groups() ->
5252
bad_exchange_property,
5353
bad_exchange_type,
5454
get_queue_not_found,
55+
declare_queues_concurrently,
5556
declare_queue_default_queue_type,
5657
declare_queue_empty_name,
5758
declare_queue_line_feed,
@@ -432,6 +433,40 @@ get_queue_not_found(Config) ->
432433
amqp10_msg:body(Resp)),
433434
ok = cleanup(Init).
434435

436+
declare_queues_concurrently(Config) ->
437+
NumQueues = 5,
438+
{Pid1, Ref1} = spawn_monitor(?MODULE, declare_queues, [Config, NumQueues]),
439+
{Pid2, Ref2} = spawn_monitor(?MODULE, declare_queues, [Config, NumQueues]),
440+
receive {'DOWN', Ref1, process, Pid1, Reason1} ->
441+
?assertEqual(normal, Reason1)
442+
end,
443+
receive {'DOWN', Ref2, process, Pid2, Reason2} ->
444+
?assertEqual(normal, Reason2)
445+
end,
446+
447+
?assertEqual(NumQueues, count_queues(Config)),
448+
449+
Init = {_, LinkPair} = init(Config),
450+
lists:foreach(fun(N) ->
451+
Bin = integer_to_binary(N),
452+
QName = <<"queue-", Bin/binary>>,
453+
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName)
454+
end, lists:seq(1, NumQueues)),
455+
ok = cleanup(Init).
456+
457+
declare_queues(Config, Num) ->
458+
Init = {_, LinkPair} = init(Config),
459+
ok = declare_queues0(LinkPair, Num),
460+
ok = cleanup(Init).
461+
462+
declare_queues0(_LinkPair, 0) ->
463+
ok;
464+
declare_queues0(LinkPair, Left) ->
465+
Bin = integer_to_binary(Left),
466+
QName = <<"queue-", Bin/binary>>,
467+
?assertMatch({ok, _}, rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{})),
468+
declare_queues0(LinkPair, Left - 1).
469+
435470
declare_queue_default_queue_type(Config) ->
436471
Node = get_node_config(Config, 0, nodename),
437472
Vhost = QName = atom_to_binary(?FUNCTION_NAME),
@@ -871,11 +906,11 @@ pipeline(Config) ->
871906
%% because RabbitMQ grants us 8 link credits initially.
872907
Num = 8,
873908
pipeline0(Num, LinkPair, <<"PUT">>, {map, []}),
874-
eventually(?_assertEqual(Num, rpc(Config, rabbit_amqqueue, count, [])), 200, 20),
909+
eventually(?_assertEqual(Num, count_queues(Config)), 200, 20),
875910
flush(queues_created),
876911

877912
pipeline0(Num, LinkPair, <<"DELETE">>, null),
878-
eventually(?_assertEqual(0, rpc(Config, rabbit_amqqueue, count, [])), 200, 20),
913+
eventually(?_assertEqual(0, count_queues(Config)), 200, 20),
879914
flush(queues_deleted),
880915

881916
ok = cleanup(Init).
@@ -1127,3 +1162,6 @@ gen_server_state(Pid) ->
11271162
L1 = lists:last(L0),
11281163
{data, L2} = lists:last(L1),
11291164
proplists:get_value("State", L2).
1165+
1166+
count_queues(Config) ->
1167+
rpc(Config, rabbit_amqqueue, count, []).

0 commit comments

Comments
 (0)