Skip to content

Commit 7432186

Browse files
committed
Make SAC disconnect timeout configurable
1 parent c06807c commit 7432186

5 files changed

+126
-22
lines changed

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -512,7 +512,7 @@ start_coordinator_cluster() ->
512512
"initial machine version ~b",
513513
[Nodes, MinVersion]),
514514
case ra:start_cluster(?RA_SYSTEM,
515-
[make_ra_conf(Node, Nodes, MinVersion)
515+
[make_system_conf(Node, Nodes, MinVersion)
516516
|| Node <- Nodes]) of
517517
{ok, Started, _} ->
518518
rabbit_log:debug("Started stream coordinator on ~w", [Started]),
@@ -536,7 +536,11 @@ version() -> 5.
536536
which_module(_) ->
537537
?MODULE.
538538

539-
init(_Conf) ->
539+
init(#{machine_version := MacVersion} = Conf) when MacVersion >= 5 ->
540+
SacConf = maps:get(sac_conf, Conf, #{}),
541+
#?MODULE{single_active_consumer =
542+
rabbit_stream_sac_coordinator:init_state(SacConf)};
543+
init(_) ->
540544
#?MODULE{single_active_consumer = rabbit_stream_sac_coordinator_v4:init_state()}.
541545

542546
-spec apply(ra_machine:command_meta_data(), command(), state()) ->
@@ -799,7 +803,8 @@ all_member_nodes(Streams) ->
799803
end, #{}, Streams)).
800804

801805
tick(_Ts, _State) ->
802-
[{aux, maybe_resize_coordinator_cluster}].
806+
[{aux, maybe_resize_coordinator_cluster},
807+
{aux, maybe_update_sac_configuration}].
803808

804809
members() ->
805810
%% TODO: this can be replaced with a ra_leaderboard
@@ -876,15 +881,27 @@ maybe_handle_stale_nodes(MemberNodes, ExpectedNodes,
876881
rabbit_log:debug("Stale nodes detected in stream SAC "
877882
"coordinator: ~w. Purging state.",
878883
[Stale]),
879-
Mod = sac_module(MachineVersion),
880-
ra:pipeline_command(LeaderPid, Mod:make_purge_nodes(Stale)),
884+
ra:pipeline_command(LeaderPid, sac_make_purge_nodes(Stale)),
881885
ok;
882886
_ ->
883887
ok
884888
end;
885889
maybe_handle_stale_nodes(_, _, _, _) ->
886890
ok.
887891

892+
maybe_update_sac_configuration(RaAux, MacVersion) when MacVersion >= 5 ->
893+
#?MODULE{single_active_consumer = SacState} = ra_aux:machine_state(RaAux),
894+
case sac_check_conf_change(SacState) of
895+
{new, UpdatedConf} ->
896+
Leader = ra_aux:leader_id(RaAux),
897+
ra:pipeline_command(Leader, sac_make_update_conf(UpdatedConf)),
898+
ok;
899+
_ ->
900+
ok
901+
end;
902+
maybe_update_sac_configuration(_, _) ->
903+
ok.
904+
888905
add_member(Members, Node) ->
889906
MinMacVersion = erpc:call(Node, ?MODULE, version, []),
890907
Conf = make_ra_conf(Node, [N || {_, N} <- Members], MinMacVersion),
@@ -967,6 +984,11 @@ handle_aux(leader, _, maybe_resize_coordinator_cluster,
967984
AuxState, RaAux) ->
968985
%% Coordinator resizing is still happening, let's ignore this tick event
969986
{no_reply, AuxState, RaAux};
987+
handle_aux(leader, _, maybe_update_sac_configuration,
988+
AuxState, RaAux) ->
989+
MachineVersion = ra_aux:effective_machine_version(RaAux),
990+
maybe_update_sac_configuration(RaAux, MachineVersion),
991+
{no_reply, AuxState, RaAux};
970992
handle_aux(leader, _, {down, Pid, _},
971993
#aux{resizer = Pid} = Aux, RaAux) ->
972994
%% Coordinator resizing has finished
@@ -1328,6 +1350,11 @@ phase_update_mnesia(StreamId, Args, #{reference := QName,
13281350
format_ra_event(ServerId, Evt) ->
13291351
{stream_coordinator_event, ServerId, Evt}.
13301352

1353+
make_system_conf(Node, Nodes, MinMacVersion) ->
1354+
RaConf = make_ra_conf(Node, Nodes, MinMacVersion),
1355+
SacConf = make_sac_conf(MinMacVersion),
1356+
RaConf#{sac_conf => SacConf}.
1357+
13311358
make_ra_conf(Node, Nodes, MinMacVersion) ->
13321359
UId = ra:new_uid(ra_lib:to_binary(?MODULE)),
13331360
Formatter = {?MODULE, format_ra_event, []},
@@ -1346,6 +1373,11 @@ make_ra_conf(Node, Nodes, MinMacVersion) ->
13461373
initial_machine_version => MinMacVersion,
13471374
ra_event_formatter => Formatter}.
13481375

1376+
make_sac_conf(MinMacVersion) when MinMacVersion >= 5 ->
1377+
sac_make_conf();
1378+
make_sac_conf(_) ->
1379+
#{}.
1380+
13491381
filter_command(_Meta, {delete_replica, _, #{node := Node}}, #stream{id = StreamId,
13501382
members = Members0}) ->
13511383
Members = maps:filter(fun(_, #member{target = S}) when S =/= deleted ->
@@ -2420,3 +2452,15 @@ maps_to_list(M) ->
24202452

24212453
ra_local_query(QueryFun) ->
24222454
ra:local_query({?MODULE, node()}, QueryFun, infinity).
2455+
2456+
sac_make_conf() ->
2457+
rabbit_stream_sac_coordinator:make_conf().
2458+
2459+
sac_make_purge_nodes(Nodes) ->
2460+
rabbit_stream_sac_coordinator:make_purge_nodes(Nodes).
2461+
2462+
sac_make_update_conf(Conf) ->
2463+
rabbit_stream_sac_coordinator:make_update_conf(Conf).
2464+
2465+
sac_check_conf_change(SacState) ->
2466+
rabbit_stream_sac_coordinator:check_conf_change(SacState).

deps/rabbit/src/rabbit_stream_sac_coordinator.erl

Lines changed: 52 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
#command_unregister_consumer{} |
2323
#command_activate_consumer{} |
2424
#command_connection_reconnected{} |
25-
#command_purge_nodes{}.
25+
#command_purge_nodes{} |
26+
#command_update_conf{}.
2627

2728
-opaque state() :: #?MODULE{}.
2829

@@ -37,7 +38,7 @@
3738
group_consumers/4,
3839
connection_reconnected/1]).
3940
-export([apply/2,
40-
init_state/0,
41+
init_state/1,
4142
send_message/2,
4243
ensure_monitors/4,
4344
handle_connection_down/2,
@@ -47,8 +48,11 @@
4748
consumer_groups/3,
4849
group_consumers/5,
4950
overview/1,
50-
import_state/2]).
51-
-export([make_purge_nodes/1]).
51+
import_state/2,
52+
make_conf/0,
53+
check_conf_change/1]).
54+
-export([make_purge_nodes/1,
55+
make_update_conf/1]).
5256

5357
%% exported for unit tests only
5458
-ifdef(TEST).
@@ -70,6 +74,8 @@
7074
-define(DISCONN_ACT, {?DISCONNECTED, ?ACTIVE}).
7175
-define(FORG_ACT, {?FORGOTTTEN, ?ACTIVE}).
7276

77+
-define(DISCONNECTED_TIMEOUT_APP_KEY, stream_sac_disconnected_timeout).
78+
-define(DISCONNECTED_TIMEOUT_CONF_KEY, disconnected_timeout).
7379
-define(DISCONNECTED_TIMEOUT_MS, 60_000).
7480

7581
%% Single Active Consumer API
@@ -195,9 +201,12 @@ overview(#?MODULE{groups = Groups}) ->
195201
Groups),
196202
#{num_groups => map_size(Groups), groups => GroupsOverview}.
197203

198-
-spec init_state() -> state().
199-
init_state() ->
200-
#?MODULE{groups = #{}, pids_groups = #{}}.
204+
-spec init_state(map()) -> state().
205+
init_state(Conf) ->
206+
DisconTimeout = maps:get(?DISCONNECTED_TIMEOUT_CONF_KEY, Conf,
207+
?DISCONNECTED_TIMEOUT_MS),
208+
#?MODULE{groups = #{}, pids_groups = #{},
209+
conf = #{?DISCONNECTED_TIMEOUT_CONF_KEY => DisconTimeout}}.
201210

202211
-spec apply(command(), state()) ->
203212
{state(), term(), ra_machine:effects()}.
@@ -319,7 +328,9 @@ apply(#command_purge_nodes{nodes = Nodes}, State0) ->
319328
{S1, Eff1} = purge_node(N, S0),
320329
{S1, Eff1 ++ Eff0}
321330
end, {State0, []}, Nodes),
322-
{State1, ok, Eff}.
331+
{State1, ok, Eff};
332+
apply(#command_update_conf{conf = NewConf}, State) ->
333+
{State#?MODULE{conf = NewConf}, ok, []}.
323334

324335
purge_node(Node, #?MODULE{groups = Groups0} = State0) ->
325336
PidsGroups = compute_node_pid_group_dependencies(Node, Groups0),
@@ -725,8 +736,7 @@ handle_connection_node_disconnected(ConnPid,
725736
handle_group_after_connection_node_disconnected(
726737
ConnPid, Acc, G)
727738
end, State1, Groups),
728-
T = application:get_env(rabbit, stream_sac_disconnected_timeout,
729-
?DISCONNECTED_TIMEOUT_MS),
739+
T = disconnected_timeout(State2),
730740
{State2, [{timer, {sac, node_disconnected,
731741
#{connection_pid => ConnPid}}, T}]}
732742
end.
@@ -850,12 +860,43 @@ handle_group_after_connection_node_disconnected(ConnPid,
850860
-spec import_state(ra_machine:version(), map()) -> state().
851861
import_state(4, #{<<"groups">> := Groups, <<"pids_groups">> := PidsGroups}) ->
852862
#?MODULE{groups = map_to_groups(Groups),
853-
pids_groups = map_to_pids_groups(PidsGroups)}.
863+
pids_groups = map_to_pids_groups(PidsGroups),
864+
conf = #{disconnected_timeout => ?DISCONNECTED_TIMEOUT_MS}}.
865+
866+
-spec make_conf() -> conf().
867+
make_conf() ->
868+
#{?DISCONNECTED_TIMEOUT_CONF_KEY => lookup_disconnected_timeout()}.
869+
870+
-spec check_conf_change(state()) -> {new, conf()} | unchanged.
871+
check_conf_change(#?MODULE{conf = Conf}) ->
872+
DisconTimeout = lookup_disconnected_timeout(),
873+
case Conf of
874+
#{?DISCONNECTED_TIMEOUT_CONF_KEY := DT}
875+
when DT /= DisconTimeout ->
876+
{new, #{?DISCONNECTED_TIMEOUT_CONF_KEY => DisconTimeout}};
877+
C when is_map_key(?DISCONNECTED_TIMEOUT_CONF_KEY, C) == false ->
878+
{new, #{?DISCONNECTED_TIMEOUT_CONF_KEY => DisconTimeout}};
879+
_ ->
880+
unchanged
881+
end.
854882

855883
- spec make_purge_nodes([node()]) -> {sac, command()}.
856884
make_purge_nodes(Nodes) ->
857885
wrap_cmd(#command_purge_nodes{nodes = Nodes}).
858886

887+
- spec make_update_conf(conf()) -> {sac, command()}.
888+
make_update_conf(Conf) ->
889+
wrap_cmd(#command_update_conf{conf = Conf}).
890+
891+
lookup_disconnected_timeout() ->
892+
application:get_env(rabbit, ?DISCONNECTED_TIMEOUT_APP_KEY,
893+
?DISCONNECTED_TIMEOUT_MS).
894+
895+
disconnected_timeout(#?MODULE{conf = #{?DISCONNECTED_TIMEOUT_CONF_KEY := T}}) ->
896+
T;
897+
disconnected_timeout(_) ->
898+
?DISCONNECTED_TIMEOUT_MS.
899+
859900
map_to_groups(Groups) when is_map(Groups) ->
860901
maps:fold(fun(K, V, Acc) ->
861902
Acc#{K => map_to_group(V)}

deps/rabbit/src/rabbit_stream_sac_coordinator.hrl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
-type owner() :: binary().
2525
-type consumer_status() :: active | waiting | deactivating.
2626
-type consumer_connectivity() :: connected | disconnected | forgotten.
27+
-type conf() :: map().
2728

2829
-record(consumer,
2930
{pid :: pid(),
@@ -35,6 +36,7 @@
3536
-record(rabbit_stream_sac_coordinator,
3637
{groups :: groups(),
3738
pids_groups :: pids_groups(),
39+
conf :: conf(),
3840
%% future extensibility
3941
reserved_1,
4042
reserved_2}).
@@ -66,3 +68,5 @@
6668
{pid :: connection_pid()}).
6769
-record(command_purge_nodes,
6870
{nodes :: [node()]}).
71+
-record(command_update_conf,
72+
{conf :: conf()}).

deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,17 @@ end_per_testcase(_TestCase, _Config) ->
6161
meck:unload(),
6262
ok.
6363

64+
check_conf_test(_) ->
65+
K = disconnected_timeout,
66+
Def = 60_000,
67+
?assertMatch({new, #{K := Def}},
68+
?MOD:check_conf_change(state_with_conf(#{}))),
69+
?assertMatch({new, #{K := Def}},
70+
?MOD:check_conf_change(state_with_conf(#{K => 42}))),
71+
?assertMatch(unchanged,
72+
?MOD:check_conf_change(state_with_conf(#{K => Def}))),
73+
ok.
74+
6475
simple_sac_test(_) ->
6576
Stream = <<"stream">>,
6677
ConsumerName = <<"app">>,
@@ -1503,6 +1514,9 @@ state(Groups) ->
15031514
state(Groups, PidsGroups) ->
15041515
#?STATE{groups = Groups, pids_groups = PidsGroups}.
15051516

1517+
state_with_conf(Conf) ->
1518+
#?STATE{conf = Conf}.
1519+
15061520
register_consumer_command(Stream,
15071521
PartitionIndex,
15081522
ConsumerName,

deps/rabbitmq_stream/test/rabbit_stream_partitions_SUITE.erl

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,13 @@ init_per_group(Group, Config) ->
6363
{aten,
6464
[{poll_interval,
6565
1000}]})
66+
end,
67+
fun(StepConfig) ->
68+
rabbit_ct_helpers:merge_app_env(StepConfig,
69+
{rabbit,
70+
[{stream_sac_disconnected_timeout,
71+
2000}]})
6672
end]
67-
% fun(StepConfig) ->
68-
% rabbit_ct_helpers:merge_app_env(StepConfig,
69-
% {rabbit,
70-
% [{stream_sac_disconnected_timeout,
71-
% 2000}]})
72-
% end]
7373
++ rabbit_ct_broker_helpers:setup_steps()).
7474

7575
end_per_group(_, Config) ->
@@ -110,6 +110,7 @@ simple_sac_consumer_should_get_disconnected_on_partition(Config) ->
110110
rabbit_ct_broker_helpers:block_traffic_between(F1, F2),
111111

112112
wait_for_disconnected_consumer(Config, S),
113+
wait_for_forgotten_consumer(Config, S),
113114

114115
rabbit_ct_broker_helpers:allow_traffic_between(F1, L),
115116
rabbit_ct_broker_helpers:allow_traffic_between(F1, F2),

0 commit comments

Comments
 (0)