Skip to content

Commit 35b5ab3

Browse files
committed
Determine queue topology without checking queue type
## What? This commit determines the queue topology without checking the queue type. ## Why? This way, checking leader and replicas works the same across all queue types without the need to introduce other rabbit_queue_type behaviour as suggested in other PRs. ## How? pid is the leader, nodes in queue_type_states are the members/replicas. This commit results in an unknown stream leader during queue declaration. However the correct leader will be returned eventually when calling GET on the stream.
1 parent 6f5c8e0 commit 35b5ab3

File tree

2 files changed

+41
-44
lines changed

2 files changed

+41
-44
lines changed

deps/rabbit/src/rabbit_amqp_management.erl

Lines changed: 23 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -444,55 +444,40 @@ encode_queue(Q, NumMsgs, NumConsumers) ->
444444
ShortName ->
445445
ShortName
446446
end}},
447-
{{utf8, <<"arguments">>}, QArgs}
447+
{{utf8, <<"arguments">>}, QArgs},
448+
{{utf8, <<"replicas">>},
449+
{array, utf8, [{utf8, atom_to_binary(R)} || R <- Replicas]}
450+
}
448451
],
449-
KVList1 = if is_list(Replicas) ->
450-
[{{utf8, <<"replicas">>},
451-
{array, utf8, [{utf8, atom_to_binary(R)} || R <- Replicas]}
452-
} | KVList0];
453-
Replicas =:= undefined ->
454-
KVList0
455-
end,
456452
KVList = case Leader of
457-
undefined ->
458-
KVList1;
453+
none ->
454+
KVList0;
459455
_ ->
460456
[{{utf8, <<"leader">>},
461457
{utf8, atom_to_binary(Leader)}
462-
} | KVList1]
458+
} | KVList0]
463459
end,
464460
{map, KVList}.
465461

466462
%% The returned Replicas contain both online and offline replicas.
467463
-spec queue_topology(amqqueue:amqqueue()) ->
468-
{Leader :: undefined | node(), Replicas :: undefined | [node(),...]}.
464+
{Leader :: node() | none, Replicas :: [node(),...]}.
469465
queue_topology(Q) ->
470-
case amqqueue:get_type(Q) of
471-
rabbit_quorum_queue ->
472-
[{leader, Leader0},
473-
{members, Members}] = rabbit_queue_type:info(Q, [leader, members]),
474-
Leader = case Leader0 of
475-
'' -> undefined;
476-
_ -> Leader0
477-
end,
478-
{Leader, Members};
479-
rabbit_stream_queue ->
480-
#{name := StreamId} = amqqueue:get_type_state(Q),
481-
case rabbit_stream_coordinator:members(StreamId) of
482-
{ok, Members} ->
483-
maps:fold(fun(Node, {_Pid, writer}, {_, Replicas}) ->
484-
{Node, [Node | Replicas]};
485-
(Node, {_Pid, replica}, {Writer, Replicas}) ->
486-
{Writer, [Node | Replicas]}
487-
end, {undefined, []}, Members);
488-
{error, _} ->
489-
{undefined, undefined}
490-
end;
491-
_ ->
492-
Pid = amqqueue:get_pid(Q),
493-
Node = node(Pid),
494-
{Node, [Node]}
495-
end.
466+
Leader = case amqqueue:get_pid(Q) of
467+
{_RaName, Node} ->
468+
Node;
469+
none ->
470+
none;
471+
Pid ->
472+
node(Pid)
473+
end,
474+
Replicas = case amqqueue:get_type_state(Q) of
475+
#{nodes := Nodes} ->
476+
Nodes;
477+
_ ->
478+
[Leader]
479+
end,
480+
{Leader, Replicas}.
496481

497482
decode_exchange({map, KVList}) ->
498483
M = lists:foldl(

deps/rabbitmq_amqp_client/test/management_SUITE.erl

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -803,17 +803,29 @@ queue_topology(Config) ->
803803
{ok, QQInfo0} = rabbitmq_amqp_client:declare_queue(LinkPair0, QQName, QQProps),
804804
{ok, SQInfo0} = rabbitmq_amqp_client:declare_queue(LinkPair0, SQName, SQProps),
805805

806-
%% The default queue leader strategy is client-local.
807-
?assertEqual({ok, N0}, maps:find(leader, CQInfo0)),
808-
?assertEqual({ok, N0}, maps:find(leader, QQInfo0)),
809-
?assertEqual({ok, N0}, maps:find(leader, SQInfo0)),
810-
811806
?assertEqual({ok, [N0]}, maps:find(replicas, CQInfo0)),
812807
{ok, QQReplicas0} = maps:find(replicas, QQInfo0),
813808
?assertEqual(Nodes, lists:usort(QQReplicas0)),
814809
{ok, SQReplicas0} = maps:find(replicas, SQInfo0),
815810
?assertEqual(Nodes, lists:usort(SQReplicas0)),
816811

812+
%% The default queue leader strategy is client-local.
813+
?assertEqual({ok, N0}, maps:find(leader, CQInfo0)),
814+
eventually(
815+
?_assert(
816+
begin
817+
{ok, QQInfo1} = rabbitmq_amqp_client:get_queue(LinkPair0, QQName),
818+
{ok, SQInfo1} = rabbitmq_amqp_client:get_queue(LinkPair0, SQName),
819+
QQLeader = maps:get(leader, QQInfo1),
820+
SQLeader = maps:get(leader, SQInfo1),
821+
ct:pal("quorum queue leader: ~s~n"
822+
"stream leader: ~s",
823+
[QQLeader, SQLeader]),
824+
QQLeader =:= N0 andalso
825+
SQLeader =:= N0
826+
end
827+
), 2000, 5),
828+
817829
ok = cleanup(Init0),
818830
ok = rabbit_ct_broker_helpers:stop_node(Config, 0),
819831

@@ -841,7 +853,7 @@ queue_topology(Config) ->
841853
(QQLeader =:= N1 orelse QQLeader =:= N2) andalso
842854
(SQLeader =:= N1 orelse SQLeader =:= N2)
843855
end
844-
), 1000, 5),
856+
), 2000, 5),
845857

846858
ok = rabbit_ct_broker_helpers:start_node(Config, 0),
847859
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair2, CQName),

0 commit comments

Comments
 (0)