Skip to content

Commit f5555bf

Browse files
committed
Add test partitions for stream SAC coordinator
1 parent 8251007 commit f5555bf

File tree

3 files changed

+299
-3
lines changed

3 files changed

+299
-3
lines changed

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@
6868
%% for testing and debugging
6969
-export([eval_listeners/3,
7070
replay/1,
71-
state/0]).
71+
state/0,
72+
sac_state/0]).
7273

7374
-import(rabbit_queue_type_util, [
7475
erpc_call/5
@@ -278,6 +279,16 @@ state() ->
278279
Any
279280
end.
280281

282+
%% for debugging
283+
sac_state() ->
284+
case state() of
285+
S when is_record(S, ?MODULE) ->
286+
sac_state(S);
287+
R ->
288+
R
289+
end.
290+
291+
281292
writer_pid(StreamId) when is_list(StreamId) ->
282293
MFA = {?MODULE, query_writer_pid, [StreamId]},
283294
query_pid(StreamId, MFA).

deps/rabbit/src/rabbit_stream_sac_coordinator.erl

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@
7070
-define(DISCONN_ACT, {?DISCONNECTED, ?ACTIVE}).
7171
-define(FORG_ACT, {?FORGOTTTEN, ?ACTIVE}).
7272

73+
-define(DISCONNECTED_TIMEOUT_MS, 60_000).
74+
7375
%% Single Active Consumer API
7476
-spec register_consumer(binary(),
7577
binary(),
@@ -723,8 +725,8 @@ handle_connection_node_disconnected(ConnPid,
723725
handle_group_after_connection_node_disconnected(
724726
ConnPid, Acc, G)
725727
end, State1, Groups),
726-
%% TODO configure timeout to forget connection from disconnected node
727-
T = 60_000,
728+
T = application:get_env(rabbit, stream_sac_disconnected_timeout,
729+
?DISCONNECTED_TIMEOUT_MS),
728730
{State2, [{timer, {sac, node_disconnected,
729731
#{connection_pid => ConnPid}}, T}]}
730732
end.
Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
%% The contents of this file are subject to the Mozilla Public License
2+
%% Version 2.0 (the "License"); you may not use this file except in
3+
%% compliance with the License. You may obtain a copy of the License
4+
%% at https://www.mozilla.org/en-US/MPL/2.0/
5+
%%
6+
%% Software distributed under the License is distributed on an "AS IS"
7+
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
%% the License for the specific language governing rights and
9+
%% limitations under the License.
10+
%%
11+
%% The Original Code is RabbitMQ.
12+
%%
13+
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
14+
%% Copyright (c) 2020-2025 Broadcom. All Rights Reserved.
15+
%% The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
16+
%%
17+
18+
-module(rabbit_stream_partitions_SUITE).
19+
20+
-include_lib("eunit/include/eunit.hrl").
21+
-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl").
22+
-include_lib("rabbit/src/rabbit_stream_sac_coordinator.hrl").
23+
24+
-compile(nowarn_export_all).
25+
-compile(export_all).
26+
27+
-define(TRSPT, gen_tcp).
28+
-define(CORR_ID, 1).
29+
-define(SAC_STATE, rabbit_stream_sac_coordinator).
30+
31+
all() ->
32+
[{group, cluster}].
33+
34+
groups() ->
35+
[{cluster, [],
36+
[simple_sac_consumer_should_get_disconnected_on_partition]}
37+
].
38+
39+
init_per_suite(Config) ->
40+
case rabbit_ct_helpers:is_mixed_versions() of
41+
true ->
42+
{skip, "mixed version clusters are not supported"};
43+
_ ->
44+
rabbit_ct_helpers:log_environment(),
45+
Config
46+
end.
47+
48+
end_per_suite(Config) ->
49+
Config.
50+
51+
init_per_group(Group, Config) ->
52+
Config1 = rabbit_ct_helpers:set_config(
53+
Config, [{rmq_nodes_clustered, true},
54+
{rmq_nodes_count, 3},
55+
{rmq_nodename_suffix, Group},
56+
{tcp_ports_base}
57+
]),
58+
rabbit_ct_helpers:run_setup_steps(
59+
Config1,
60+
[fun rabbit_ct_broker_helpers:configure_dist_proxy/1,
61+
fun(StepConfig) ->
62+
rabbit_ct_helpers:merge_app_env(StepConfig,
63+
{aten,
64+
[{poll_interval,
65+
1000}]})
66+
end]
67+
% fun(StepConfig) ->
68+
% rabbit_ct_helpers:merge_app_env(StepConfig,
69+
% {rabbit,
70+
% [{stream_sac_disconnected_timeout,
71+
% 2000}]})
72+
% end]
73+
++ rabbit_ct_broker_helpers:setup_steps()).
74+
75+
end_per_group(_, Config) ->
76+
rabbit_ct_helpers:run_steps(Config,
77+
rabbit_ct_broker_helpers:teardown_steps()).
78+
79+
init_per_testcase(TestCase, Config) ->
80+
rabbit_ct_helpers:testcase_started(Config, TestCase).
81+
82+
end_per_testcase(TestCase, Config) ->
83+
rabbit_ct_helpers:testcase_finished(Config, TestCase).
84+
85+
simple_sac_consumer_should_get_disconnected_on_partition(Config) ->
86+
T = ?TRSPT,
87+
S = rabbit_data_coercion:to_binary(?FUNCTION_NAME),
88+
{ok, S0, C0} = stream_test_utils:connect(Config, 0),
89+
{ok, S1, C1} = stream_test_utils:connect(Config, 1),
90+
{ok, S2, C2} = stream_test_utils:connect(Config, 2),
91+
92+
create_stream(Config, S),
93+
wait_for_members(S0, C0, S, 3),
94+
95+
C0_01 = register_sac(S0, C0, S),
96+
C0_02 = receive_consumer_update(S0, C0_01),
97+
98+
C1_01 = register_sac(S1, C1, S),
99+
C2_01 = register_sac(S2, C2, S),
100+
101+
Members = stream_members(Config, S),
102+
L = leader(Members),
103+
[F1, F2] = followers(Members),
104+
105+
Consumers = query_consumers(Config, S),
106+
assertSize(3, Consumers),
107+
assertConsumersConnected(Consumers),
108+
109+
rabbit_ct_broker_helpers:block_traffic_between(F1, L),
110+
rabbit_ct_broker_helpers:block_traffic_between(F1, F2),
111+
112+
wait_for_disconnected_consumer(Config, S),
113+
114+
rabbit_ct_broker_helpers:allow_traffic_between(F1, L),
115+
rabbit_ct_broker_helpers:allow_traffic_between(F1, F2),
116+
117+
wait_for_all_consumers_connected(Config, S),
118+
assertConsumersConnected(query_consumers(Config, S)),
119+
120+
delete_stream(Config, S),
121+
122+
{_, _} = receive_commands(T, S0, C0),
123+
{_, _} = receive_commands(T, S1, C1),
124+
{_, _} = receive_commands(T, S2, C2),
125+
126+
{ok, _} = stream_test_utils:close(S0, C0),
127+
{ok, _} = stream_test_utils:close(S1, C1),
128+
{ok, _} = stream_test_utils:close(S2, C2),
129+
ok.
130+
131+
leader(Members) ->
132+
maps:fold(fun(Node, {_, writer}, _Acc) ->
133+
Node;
134+
(_, _, Acc) ->
135+
Acc
136+
end, undefined, Members).
137+
138+
followers(Members) ->
139+
maps:fold(fun(Node, {_, replica}, Acc) ->
140+
[Node | Acc];
141+
(_, _, Acc) ->
142+
Acc
143+
end, [], Members).
144+
145+
stream_members(Config, Stream) ->
146+
{ok, Q} = rpc(Config, rabbit_amqqueue, lookup, [Stream, <<"/">>]),
147+
#{name := StreamId} = amqqueue:get_type_state(Q),
148+
State = rpc(Config, rabbit_stream_coordinator, state, []),
149+
{ok, Members} = rpc(Config, rabbit_stream_coordinator, query_members,
150+
[StreamId, State]),
151+
Members.
152+
153+
create_stream(Config, St) ->
154+
{ok, S, C0} = stream_test_utils:connect(Config, 0),
155+
{ok, C1} = stream_test_utils:create_stream(S, C0, St),
156+
{ok, _} = stream_test_utils:close(S, C1).
157+
158+
delete_stream(Config, St) ->
159+
{ok, S, C0} = stream_test_utils:connect(Config, 0),
160+
{ok, C1} = stream_test_utils:delete_stream(S, C0, St),
161+
{ok, _} = stream_test_utils:close(S, C1).
162+
163+
register_sac(S, C0, St) ->
164+
SacSubscribeFrame = request({subscribe, 0, St,
165+
first, 1,
166+
#{<<"single-active-consumer">> => <<"true">>,
167+
<<"name">> => name()}}),
168+
T = ?TRSPT,
169+
ok = T:send(S, SacSubscribeFrame),
170+
{Cmd1, C1} = receive_commands(T, S, C0),
171+
?assertMatch({response, ?CORR_ID, {subscribe, ?RESPONSE_CODE_OK}},
172+
Cmd1),
173+
C1.
174+
175+
receive_consumer_update(S, C0) ->
176+
{Cmd, C1} = receive_commands(?TRSPT, S, C0),
177+
?assertMatch({request, _CorrId, {consumer_update, _SubId, _Status}},
178+
Cmd),
179+
C1.
180+
181+
unsubscribe(S, C0) ->
182+
{ok, C1} = stream_test_utils:unsubscribe(S, C0, sub_id()),
183+
C1.
184+
185+
query_consumers(Config, Stream) ->
186+
Key = group_key(Stream),
187+
#?SAC_STATE{groups = #{Key := #group{consumers = Consumers}}} =
188+
rpc(Config, rabbit_stream_coordinator, sac_state, []),
189+
Consumers.
190+
191+
coordinator_state(Config) ->
192+
rpc(Config, rabbit_stream_coordinator, state, []).
193+
194+
rpc(Config, M, F, A) ->
195+
rabbit_ct_broker_helpers:rpc(Config, 0, M, F, A).
196+
197+
group_key(Stream) ->
198+
{<<"/">>, Stream, name()}.
199+
200+
request(Cmd) ->
201+
request(?CORR_ID, Cmd).
202+
203+
request(CorrId, Cmd) ->
204+
rabbit_stream_core:frame({request, CorrId, Cmd}).
205+
206+
receive_commands(Transport, S, C) ->
207+
stream_test_utils:receive_stream_commands(Transport, S, C).
208+
209+
sub_id() ->
210+
0.
211+
212+
name() ->
213+
<<"app">>.
214+
215+
wait_for_members(S, C, St, ExpectedCount) ->
216+
T = ?TRSPT,
217+
GetStreamNodes =
218+
fun() ->
219+
MetadataFrame = request({metadata, [St]}),
220+
ok = gen_tcp:send(S, MetadataFrame),
221+
{CmdMetadata, _} = receive_commands(T, S, C),
222+
{response, 1,
223+
{metadata, _Nodes, #{St := {Leader = {_H, _P}, Replicas}}}} =
224+
CmdMetadata,
225+
[Leader | Replicas]
226+
end,
227+
rabbit_ct_helpers:await_condition(fun() ->
228+
length(GetStreamNodes()) == ExpectedCount
229+
end).
230+
231+
wait_for_disconnected_consumer(Config, Stream) ->
232+
rabbit_ct_helpers:await_condition(
233+
fun() ->
234+
Cs = query_consumers(Config, Stream),
235+
lists:any(fun(#consumer{status = {disconnected, _}}) ->
236+
true;
237+
(_) ->
238+
false
239+
end, Cs)
240+
end).
241+
242+
wait_for_forgotten_consumer(Config, Stream) ->
243+
rabbit_ct_helpers:await_condition(
244+
fun() ->
245+
Cs = query_consumers(Config, Stream),
246+
lists:any(fun(#consumer{status = {forgotten, _}}) ->
247+
true;
248+
(_) ->
249+
false
250+
end, Cs)
251+
end).
252+
253+
wait_for_all_consumers_connected(Config, Stream) ->
254+
rabbit_ct_helpers:await_condition(
255+
fun() ->
256+
Cs = query_consumers(Config, Stream),
257+
lists:all(fun(#consumer{status = {connected, _}}) ->
258+
true;
259+
(_) ->
260+
false
261+
end, Cs)
262+
end).
263+
264+
265+
assertConsumersConnected(Consumers) when length(Consumers) > 0 ->
266+
lists:foreach(fun(#consumer{status = St}) ->
267+
?assertMatch({connected, _}, St,
268+
"Consumer should be connected")
269+
end, Consumers);
270+
assertConsumersConnected(_) ->
271+
?assert(false, "The consumer list is empty").
272+
273+
274+
assertSize(Expected, []) ->
275+
?assertEqual(Expected, 0);
276+
assertSize(Expected, Map) when is_map(Map) ->
277+
?assertEqual(Expected, maps:size(Map));
278+
assertSize(Expected, List) when is_list(List) ->
279+
?assertEqual(Expected, length(List)).
280+
281+
assertEmpty(Data) ->
282+
assertSize(0, Data).
283+

0 commit comments

Comments
 (0)