Skip to content

Commit 7289487

Browse files
Merge pull request #13946 from rabbitmq/ik-qq-reconiliation
QQ member reconciliation: switch to gen_event events instead of hardcoded triggers
2 parents b527fc3 + a71d0f9 commit 7289487

5 files changed

+141
-41
lines changed

deps/rabbit/src/rabbit_node_monitor.erl

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -857,7 +857,6 @@ handle_dead_rabbit(Node, State) ->
857857
%% statements on *one* node, rather than all of them.
858858
ok = rabbit_amqqueue:on_node_down(Node),
859859
ok = rabbit_alarm:on_node_down(Node),
860-
ok = rabbit_quorum_queue_periodic_membership_reconciliation:on_node_down(Node),
861860
State1 = case rabbit_khepri:is_enabled() of
862861
true -> State;
863862
false -> on_node_down_using_mnesia(Node, State)
@@ -898,8 +897,7 @@ handle_live_rabbit(Node) ->
898897
true -> ok;
899898
false -> on_node_up_using_mnesia(Node)
900899
end,
901-
ok = rabbit_vhosts:on_node_up(Node),
902-
ok = rabbit_quorum_queue_periodic_membership_reconciliation:on_node_up(Node).
900+
ok = rabbit_vhosts:on_node_up(Node).
903901

904902
on_node_up_using_mnesia(Node) ->
905903
ok = rabbit_mnesia:on_node_up(Node).

deps/rabbit/src/rabbit_policy.erl

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -378,13 +378,11 @@ validate(_VHost, <<"operator_policy">>, Name, Term, _User) ->
378378
notify(VHost, <<"policy">>, Name, Term0, ActingUser) ->
379379
Term = rabbit_data_coercion:atomize_keys(Term0),
380380
update_matched_objects(VHost, Term, ActingUser),
381-
rabbit_quorum_queue_periodic_membership_reconciliation:policy_set(),
382381
rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost},
383382
{user_who_performed_action, ActingUser} | Term]);
384383
notify(VHost, <<"operator_policy">>, Name, Term0, ActingUser) ->
385384
Term = rabbit_data_coercion:atomize_keys(Term0),
386385
update_matched_objects(VHost, Term, ActingUser),
387-
rabbit_quorum_queue_periodic_membership_reconciliation:policy_set(),
388386
rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost},
389387
{user_who_performed_action, ActingUser} | Term]).
390388

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-module(rabbit_quorum_event_subscriber).
9+
10+
-behaviour(gen_event).
11+
12+
-export([init/1, handle_event/2, handle_call/2]).
13+
-export([register/0, unregister/0]).
14+
15+
-include_lib("rabbit_common/include/rabbit.hrl").
16+
17+
-rabbit_boot_step({rabbit_quorum_event_subscriber,
18+
[{description, "quorum queue event subscriber"},
19+
{mfa, {?MODULE, register, []}},
20+
{cleanup, {?MODULE, unregister, []}},
21+
{requires, rabbit_event},
22+
{enables, recovery}]}).
23+
24+
register() ->
25+
gen_event:add_handler(rabbit_alarm, ?MODULE, []),
26+
gen_event:add_handler(rabbit_event, ?MODULE, []).
27+
28+
unregister() ->
29+
gen_event:delete_handler(rabbit_alarm, ?MODULE, []),
30+
gen_event:delete_handler(rabbit_event, ?MODULE, []).
31+
32+
init([]) ->
33+
{ok, []}.
34+
35+
handle_call( _, State) ->
36+
{ok, ok, State}.
37+
38+
handle_event({node_up, Node}, State) ->
39+
rabbit_quorum_queue_periodic_membership_reconciliation:on_node_up(Node),
40+
{ok, State};
41+
handle_event({node_down, Node}, State) ->
42+
rabbit_quorum_queue_periodic_membership_reconciliation:on_node_down(Node),
43+
{ok, State};
44+
handle_event(#event{type = policy_set}, State) ->
45+
rabbit_quorum_queue_periodic_membership_reconciliation:policy_set(),
46+
{ok, State};
47+
handle_event(#event{type = operator_policy_set}, State) ->
48+
rabbit_quorum_queue_periodic_membership_reconciliation:policy_set(),
49+
{ok, State};
50+
handle_event(_, State) ->
51+
{ok, State}.

deps/rabbit/src/rabbit_quorum_queue_periodic_membership_reconciliation.erl

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
2020
code_change/3]).
2121

22+
-include_lib("kernel/include/logger.hrl").
23+
2224
-define(SERVER, ?MODULE).
2325
-define(DEFAULT_INTERVAL, 60_000*60).
2426
-define(DEFAULT_TRIGGER_INTERVAL, 10_000).
@@ -91,8 +93,7 @@ handle_cast({membership_reconciliation_trigger, _Reason}, #state{enabled = false
9193
{noreply, State, hibernate};
9294
handle_cast({membership_reconciliation_trigger, Reason}, #state{timer_ref = OldRef,
9395
trigger_interval = Time} = State) ->
94-
rabbit_log:debug("Quorum Queue membership reconciliation triggered: ~p",
95-
[Reason]),
96+
?LOG_DEBUG("Quorum Queue membership reconciliation scheduled: ~p", [Reason]),
9697
_ = erlang:cancel_timer(OldRef),
9798
Ref = erlang:send_after(Time, self(), ?EVAL_MSG),
9899
{noreply, State#state{timer_ref = Ref}};
@@ -158,7 +159,7 @@ reconciliate_quorum_members(ExpectedNodes, Running, [Q | LocalLeaders],
158159
end
159160
else
160161
{timeout, Reason} ->
161-
rabbit_log:debug("Find leader timeout: ~p", [Reason]),
162+
?LOG_DEBUG("Find leader timeout: ~p", [Reason]),
162163
ok;
163164
_ ->
164165
noop
@@ -184,13 +185,13 @@ maybe_add_member(Q, Running, MemberNodes, TargetSize) ->
184185
QName = amqqueue:get_name(Q),
185186
case rabbit_quorum_queue:add_member(Q, Node) of
186187
ok ->
187-
rabbit_log:debug(
188+
?LOG_DEBUG(
188189
"Added node ~ts as a member to ~ts as "
189190
"the queues target group size(#~w) is not met and "
190191
"there are enough new nodes(#~w) in the cluster",
191192
[Node, rabbit_misc:rs(QName), TargetSize, length(New)]);
192193
{error, Err} ->
193-
rabbit_log:warning(
194+
?LOG_WARNING(
194195
"~ts: failed to add member (replica) on node ~w, error: ~w",
195196
[rabbit_misc:rs(QName), Node, Err])
196197
end,
@@ -235,12 +236,12 @@ remove_members(Q, [Node | Nodes]) ->
235236
case rabbit_quorum_queue:delete_member(Q, Node) of
236237
ok ->
237238
QName = amqqueue:get_name(Q),
238-
rabbit_log:debug("~ts: Successfully removed member (replica) on node ~w",
239+
?LOG_DEBUG("~ts: Successfully removed member (replica) on node ~w",
239240
[rabbit_misc:rs(QName), Node]),
240241
ok;
241242
{error, Err} ->
242243
QName = amqqueue:get_name(Q),
243-
rabbit_log:warning("~ts: failed to remove member (replica) on node "
244+
?LOG_DEBUG("~ts: failed to remove member (replica) on node "
244245
"~w, error: ~w",
245246
[rabbit_misc:rs(QName), Node, Err])
246247
end,

deps/rabbit/test/quorum_queue_member_reconciliation_SUITE.erl

Lines changed: 81 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -12,39 +12,71 @@
1212
-include_lib("amqp_client/include/amqp_client.hrl").
1313
-compile([nowarn_export_all, export_all]).
1414

15+
%% The reconciler has two modes of triggering itself
16+
%% - timer based
17+
%% - event based
18+
%% The default config of this test has Interval very short - 5 second which is lower than
19+
%% wait_until timeout. Meaninig that even if all domain triggers (node_up/down, policy_set, etc)
20+
%% are disconnected tests would be still green.
21+
%% So to test triggers it is essential to set Interval high enough (the very default value of 60 minutes is perfect)
22+
%%
23+
%% TODO: test `policy_set` trigger
1524

1625
all() ->
1726
[
18-
{group, unclustered}
27+
{group, unclustered},
28+
{group, unclustered_triggers}
1929
].
2030

2131
groups() ->
2232
[
23-
{unclustered, [],
33+
{unclustered, [], %% low interval, even if triggers do not work all tests should pass
2434
[
2535
{quorum_queue_3, [], [auto_grow, auto_grow_drained_node, auto_shrink]}
36+
]},
37+
%% uses an interval longer than `wait_until` (30s by default)
38+
{unclustered_triggers, [],
39+
[
40+
%% see also `auto_grow_drained_node`
41+
{quorum_queue_3, [], [auto_grow, auto_shrink]}
2642
]}
2743
].
2844

2945
%% -------------------------------------------------------------------
3046
%% Testsuite setup/teardown.
3147
%% -------------------------------------------------------------------
3248

33-
init_per_suite(Config0) ->
49+
init_per_suite(Config) ->
3450
rabbit_ct_helpers:log_environment(),
51+
rabbit_ct_helpers:run_setup_steps(Config, []).
52+
53+
end_per_suite(Config) ->
54+
rabbit_ct_helpers:run_teardown_steps(Config).
55+
56+
init_per_group(unclustered, Config0) ->
3557
Config1 = rabbit_ct_helpers:merge_app_env(
3658
Config0, {rabbit, [{quorum_tick_interval, 1000},
3759
{quorum_membership_reconciliation_enabled, true},
3860
{quorum_membership_reconciliation_auto_remove, true},
3961
{quorum_membership_reconciliation_interval, 5000},
4062
{quorum_membership_reconciliation_trigger_interval, 2000},
4163
{quorum_membership_reconciliation_target_group_size, 3}]}),
42-
rabbit_ct_helpers:run_setup_steps(Config1, []).
43-
44-
end_per_suite(Config) ->
45-
rabbit_ct_helpers:run_teardown_steps(Config).
46-
init_per_group(unclustered, Config) ->
47-
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]);
64+
rabbit_ct_helpers:set_config(Config1, [{rmq_nodes_clustered, false}]);
65+
init_per_group(unclustered_triggers, Config0) ->
66+
Config1 = rabbit_ct_helpers:merge_app_env(
67+
Config0, {rabbit, [{quorum_tick_interval, 1000},
68+
{quorum_membership_reconciliation_enabled, true},
69+
{quorum_membership_reconciliation_auto_remove, true},
70+
{quorum_membership_reconciliation_interval, 50000},
71+
{quorum_membership_reconciliation_trigger_interval, 2000},
72+
{quorum_membership_reconciliation_target_group_size, 3}]}),
73+
%% shrink timeout is set here because without it, when a node stopped right after a queue was created,
74+
%% the test will pass without any triggers because cluster change will likely happen before the trigger_interval,
75+
%% scheduled in response to queue_created event.
76+
%% See also a comment in `auto_shrink/1`.
77+
rabbit_ct_helpers:set_config(Config1, [{rmq_nodes_clustered, false},
78+
{quorum_membership_reconciliation_interval, 50000},
79+
{shrink_timeout, 2000}]);
4880
init_per_group(Group, Config) ->
4981
ClusterSize = 3,
5082
Config1 = rabbit_ct_helpers:set_config(Config,
@@ -57,6 +89,8 @@ init_per_group(Group, Config) ->
5789

5890
end_per_group(unclustered, Config) ->
5991
Config;
92+
end_per_group(unclustered_triggers, Config) ->
93+
Config;
6094
end_per_group(_, Config) ->
6195
rabbit_ct_helpers:run_steps(Config,
6296
rabbit_ct_broker_helpers:teardown_steps()).
@@ -72,34 +106,17 @@ init_per_testcase(Testcase, Config) ->
72106
]),
73107
rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()).
74108

75-
merge_app_env(Config) ->
76-
rabbit_ct_helpers:merge_app_env(
77-
rabbit_ct_helpers:merge_app_env(Config,
78-
{rabbit, [{core_metrics_gc_interval, 100}]}),
79-
{ra, [{min_wal_roll_over_interval, 30000}]}).
80-
81109
end_per_testcase(Testcase, Config) ->
82110
[Server0, Server1, Server2] =
83111
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
112+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
113+
amqp_channel:call(Ch, #'queue.delete'{queue = rabbit_data_coercion:to_binary(Testcase)}),
84114
reset_nodes([Server2, Server0], Server1),
85115
Config1 = rabbit_ct_helpers:run_steps(
86116
Config,
87117
rabbit_ct_client_helpers:teardown_steps()),
88118
rabbit_ct_helpers:testcase_finished(Config1, Testcase).
89119

90-
reset_nodes([], _Leader) ->
91-
ok;
92-
reset_nodes([Node| Nodes], Leader) ->
93-
ok = rabbit_control_helper:command(stop_app, Node),
94-
case rabbit_control_helper:command(forget_cluster_node, Leader, [atom_to_list(Node)]) of
95-
ok -> ok;
96-
{error, _, <<"Error:\n{:not_a_cluster_node, ~c\"The node selected is not in the cluster.\"}">>} -> ok
97-
end,
98-
ok = rabbit_control_helper:command(reset, Node),
99-
ok = rabbit_control_helper:command(start_app, Node),
100-
reset_nodes(Nodes, Leader).
101-
102-
103120
%% -------------------------------------------------------------------
104121
%% Testcases.
105122
%% -------------------------------------------------------------------
@@ -134,6 +151,10 @@ auto_grow(Config) ->
134151
end).
135152

136153
auto_grow_drained_node(Config) ->
154+
%% NOTE: with large Interval (larger than wait_until) test will fail.
155+
%% the reason is that entering/exiting drain state does not emit events
156+
%% and even if they did via gen_event, they going to be only local to that node.
157+
%% so reconciliator has no choice but to wait full Interval
137158
[Server0, Server1, Server2] =
138159
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
139160
Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
@@ -169,7 +190,6 @@ auto_grow_drained_node(Config) ->
169190
3 =:= length(M)
170191
end).
171192

172-
173193
auto_shrink(Config) ->
174194
[Server0, Server1, Server2] =
175195
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -186,6 +206,18 @@ auto_shrink(Config) ->
186206
Server1}),
187207
3 =:= length(M)
188208
end),
209+
210+
%% QQ member reconciliation does not act immediately but rather after a scheduled delay.
211+
%% So if this test wants to test that the reconciliator reacts to, say, node_down or a similar event,
212+
%% it has to wait at least a trigger_interval ms to pass before removing node. Otherwise
213+
%% the shrink effect would come from the previous trigger.
214+
%%
215+
%% When a `queue_created` trigger set up a timer to fire after a trigger_interval, the queue has 3 members
216+
%% and stop_app executes much quicker than the trigger_interval. Therefore the number of members
217+
%% will be updated even without a node_down event.
218+
219+
timer:sleep(rabbit_ct_helpers:get_config(Config, shrink_timeout, 0)),
220+
189221
ok = rabbit_control_helper:command(stop_app, Server2),
190222
ok = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_db_cluster, forget_member,
191223
[Server2, false]),
@@ -196,7 +228,27 @@ auto_shrink(Config) ->
196228
2 =:= length(M)
197229
end).
198230

231+
%% -------------------------------------------------------------------
232+
%% Helpers.
233+
%% -------------------------------------------------------------------
199234

235+
merge_app_env(Config) ->
236+
rabbit_ct_helpers:merge_app_env(
237+
rabbit_ct_helpers:merge_app_env(Config,
238+
{rabbit, [{core_metrics_gc_interval, 100}]}),
239+
{ra, [{min_wal_roll_over_interval, 30000}]}).
240+
241+
reset_nodes([], _Leader) ->
242+
ok;
243+
reset_nodes([Node| Nodes], Leader) ->
244+
ok = rabbit_control_helper:command(stop_app, Node),
245+
case rabbit_control_helper:command(forget_cluster_node, Leader, [atom_to_list(Node)]) of
246+
ok -> ok;
247+
{error, _, <<"Error:\n{:not_a_cluster_node, ~c\"The node selected is not in the cluster.\"}">>} -> ok
248+
end,
249+
ok = rabbit_control_helper:command(reset, Node),
250+
ok = rabbit_control_helper:command(start_app, Node),
251+
reset_nodes(Nodes, Leader).
200252

201253
add_server_to_cluster(Server, Leader) ->
202254
ok = rabbit_control_helper:command(stop_app, Server),

0 commit comments

Comments
 (0)