Skip to content

Commit 8cc26c4

Browse files
authored
Merge pull request #13857 from rabbitmq/stream-coordinator-fix-partition-index-conflict
Fix partition index conflict in stream SAC coordinator
2 parents 1c0539f + cad8b70 commit 8cc26c4

File tree

4 files changed

+197
-119
lines changed

4 files changed

+197
-119
lines changed

deps/rabbit/src/rabbit_stream_sac_coordinator.erl

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -198,21 +198,23 @@ apply(#command_register_consumer{vhost = VirtualHost,
198198
owner = Owner,
199199
subscription_id = SubscriptionId},
200200
#?MODULE{groups = StreamGroups0} = State) ->
201-
StreamGroups1 =
202-
maybe_create_group(VirtualHost,
201+
case maybe_create_group(VirtualHost,
203202
Stream,
204203
PartitionIndex,
205204
ConsumerName,
206-
StreamGroups0),
207-
208-
do_register_consumer(VirtualHost,
209-
Stream,
210-
PartitionIndex,
211-
ConsumerName,
212-
ConnectionPid,
213-
Owner,
214-
SubscriptionId,
215-
State#?MODULE{groups = StreamGroups1});
205+
StreamGroups0) of
206+
{ok, StreamGroups1} ->
207+
do_register_consumer(VirtualHost,
208+
Stream,
209+
PartitionIndex,
210+
ConsumerName,
211+
ConnectionPid,
212+
Owner,
213+
SubscriptionId,
214+
State#?MODULE{groups = StreamGroups1});
215+
{error, Error} ->
216+
{State, {error, Error}, []}
217+
end;
216218
apply(#command_unregister_consumer{vhost = VirtualHost,
217219
stream = Stream,
218220
consumer_name = ConsumerName,
@@ -644,12 +646,15 @@ maybe_create_group(VirtualHost,
644646
ConsumerName,
645647
StreamGroups) ->
646648
case StreamGroups of
647-
#{{VirtualHost, Stream, ConsumerName} := _Group} ->
648-
StreamGroups;
649+
#{{VirtualHost, Stream, ConsumerName} := #group{partition_index = PI}}
650+
when PI =/= PartitionIndex ->
651+
{error, partition_index_conflict};
652+
#{{VirtualHost, Stream, ConsumerName} := _} ->
653+
{ok, StreamGroups};
649654
SGS ->
650-
maps:put({VirtualHost, Stream, ConsumerName},
651-
#group{consumers = [], partition_index = PartitionIndex},
652-
SGS)
655+
{ok, maps:put({VirtualHost, Stream, ConsumerName},
656+
#group{consumers = [], partition_index = PartitionIndex},
657+
SGS)}
653658
end.
654659

655660
lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups) ->

deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,20 @@ handle_connection_down_super_stream_no_active_removed_or_present_test(_) ->
503503
Groups),
504504
ok.
505505

506+
register_consumer_with_different_partition_index_should_return_error_test(_) ->
507+
Stream = <<"stream">>,
508+
ConsumerName = <<"app">>,
509+
ConnectionPid = self(),
510+
Command0 =
511+
register_consumer_command(Stream, -1, ConsumerName, ConnectionPid, 0),
512+
State0 = state(),
513+
{State1, {ok, true}, _} =
514+
rabbit_stream_sac_coordinator:apply(Command0, State0),
515+
Command1 =
516+
register_consumer_command(Stream, 1, ConsumerName, ConnectionPid, 1),
517+
{_, {error, partition_index_conflict}, []} =
518+
rabbit_stream_sac_coordinator:apply(Command1, State1).
519+
506520
assertSize(Expected, []) ->
507521
?assertEqual(Expected, 0);
508522
assertSize(Expected, Map) when is_map(Map) ->

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 113 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -1927,21 +1927,17 @@ handle_frame_post_auth(Transport, {ok, #stream_connection{user = User} = C}, Sta
19271927
{C, State};
19281928
handle_frame_post_auth(Transport,
19291929
{ok, #stream_connection{
1930-
name = ConnName,
1931-
socket = Socket,
19321930
stream_subscriptions = StreamSubscriptions,
19331931
virtual_host = VirtualHost,
1934-
user = User,
1935-
send_file_oct = SendFileOct,
1936-
transport = ConnTransport} = Connection},
1937-
#stream_connection_state{consumers = Consumers} = State,
1932+
user = User} = Connection},
1933+
State,
19381934
{request, CorrelationId,
19391935
{subscribe,
19401936
SubscriptionId,
19411937
Stream,
19421938
OffsetSpec,
1943-
Credit,
1944-
Properties}}) ->
1939+
_Credit,
1940+
Properties}} = Request) ->
19451941
QueueResource =
19461942
#resource{name = Stream,
19471943
kind = queue,
@@ -2004,89 +2000,9 @@ handle_frame_post_auth(Transport,
20042000
increase_protocol_counter(?PRECONDITION_FAILED),
20052001
{Connection, State};
20062002
_ ->
2007-
Log = case Sac of
2008-
true ->
2009-
undefined;
2010-
false ->
2011-
init_reader(ConnTransport,
2012-
LocalMemberPid,
2013-
QueueResource,
2014-
SubscriptionId,
2015-
Properties,
2016-
OffsetSpec)
2017-
end,
2018-
2019-
ConsumerCounters =
2020-
atomics:new(2, [{signed, false}]),
2021-
2022-
response_ok(Transport,
2023-
Connection,
2024-
subscribe,
2025-
CorrelationId),
2026-
2027-
Active =
2028-
maybe_register_consumer(VirtualHost,
2029-
Stream,
2030-
ConsumerName,
2031-
ConnName,
2032-
SubscriptionId,
2033-
Properties,
2034-
Sac),
2035-
2036-
ConsumerConfiguration =
2037-
#consumer_configuration{member_pid =
2038-
LocalMemberPid,
2039-
subscription_id
2040-
=
2041-
SubscriptionId,
2042-
socket = Socket,
2043-
stream = Stream,
2044-
offset =
2045-
OffsetSpec,
2046-
counters =
2047-
ConsumerCounters,
2048-
properties =
2049-
Properties,
2050-
active =
2051-
Active},
2052-
SendLimit = Credit div 2,
2053-
ConsumerState =
2054-
#consumer{configuration =
2055-
ConsumerConfiguration,
2056-
log = Log,
2057-
send_limit = SendLimit,
2058-
credit = Credit},
2059-
2060-
Connection1 =
2061-
maybe_monitor_stream(LocalMemberPid,
2062-
Stream,
2063-
Connection),
2064-
2065-
State1 =
2066-
maybe_dispatch_on_subscription(Transport,
2067-
State,
2068-
ConsumerState,
2069-
Connection1,
2070-
Consumers,
2071-
Stream,
2072-
SubscriptionId,
2073-
Properties,
2074-
SendFileOct,
2075-
Sac),
2076-
StreamSubscriptions1 =
2077-
case StreamSubscriptions of
2078-
#{Stream := SubscriptionIds} ->
2079-
StreamSubscriptions#{Stream =>
2080-
[SubscriptionId]
2081-
++ SubscriptionIds};
2082-
_ ->
2083-
StreamSubscriptions#{Stream =>
2084-
[SubscriptionId]}
2085-
end,
2086-
{Connection1#stream_connection{stream_subscriptions
2087-
=
2088-
StreamSubscriptions1},
2089-
State1}
2003+
handle_subscription(Transport, Connection,
2004+
State, Request,
2005+
LocalMemberPid)
20902006
end
20912007
end
20922008
end;
@@ -2995,8 +2911,106 @@ maybe_dispatch_on_subscription(_Transport,
29952911
Consumers1 = Consumers#{SubscriptionId => ConsumerState},
29962912
State#stream_connection_state{consumers = Consumers1}.
29972913

2914+
handle_subscription(Transport,#stream_connection{
2915+
name = ConnName,
2916+
socket = Socket,
2917+
stream_subscriptions = StreamSubscriptions,
2918+
virtual_host = VirtualHost,
2919+
send_file_oct = SendFileOct,
2920+
transport = ConnTransport} = Connection,
2921+
#stream_connection_state{consumers = Consumers} = State,
2922+
{request, CorrelationId, {subscribe,
2923+
SubscriptionId,
2924+
Stream,
2925+
OffsetSpec,
2926+
Credit,
2927+
Properties}},
2928+
LocalMemberPid) ->
2929+
Sac = single_active_consumer(Properties),
2930+
ConsumerName = consumer_name(Properties),
2931+
QueueResource = #resource{name = Stream,
2932+
kind = queue,
2933+
virtual_host = VirtualHost},
2934+
case maybe_register_consumer(VirtualHost, Stream, ConsumerName, ConnName,
2935+
SubscriptionId, Properties, Sac) of
2936+
{ok, Active} ->
2937+
Log = case Sac of
2938+
true ->
2939+
undefined;
2940+
false ->
2941+
init_reader(ConnTransport,
2942+
LocalMemberPid,
2943+
QueueResource,
2944+
SubscriptionId,
2945+
Properties,
2946+
OffsetSpec)
2947+
end,
2948+
2949+
ConsumerCounters = atomics:new(2, [{signed, false}]),
2950+
2951+
response_ok(Transport,
2952+
Connection,
2953+
subscribe,
2954+
CorrelationId),
2955+
2956+
ConsumerConfiguration = #consumer_configuration{
2957+
member_pid = LocalMemberPid,
2958+
subscription_id = SubscriptionId,
2959+
socket = Socket,
2960+
stream = Stream,
2961+
offset = OffsetSpec,
2962+
counters = ConsumerCounters,
2963+
properties = Properties,
2964+
active = Active},
2965+
SendLimit = Credit div 2,
2966+
ConsumerState =
2967+
#consumer{configuration = ConsumerConfiguration,
2968+
log = Log,
2969+
send_limit = SendLimit,
2970+
credit = Credit},
2971+
2972+
Connection1 = maybe_monitor_stream(LocalMemberPid,
2973+
Stream,
2974+
Connection),
2975+
2976+
State1 = maybe_dispatch_on_subscription(Transport,
2977+
State,
2978+
ConsumerState,
2979+
Connection1,
2980+
Consumers,
2981+
Stream,
2982+
SubscriptionId,
2983+
Properties,
2984+
SendFileOct,
2985+
Sac),
2986+
StreamSubscriptions1 =
2987+
case StreamSubscriptions of
2988+
#{Stream := SubscriptionIds} ->
2989+
StreamSubscriptions#{Stream =>
2990+
[SubscriptionId]
2991+
++ SubscriptionIds};
2992+
_ ->
2993+
StreamSubscriptions#{Stream =>
2994+
[SubscriptionId]}
2995+
end,
2996+
{Connection1#stream_connection{stream_subscriptions
2997+
=
2998+
StreamSubscriptions1},
2999+
State1};
3000+
{error, Reason} ->
3001+
rabbit_log:warning("Cannot create SAC subcription ~tp: ~tp",
3002+
[SubscriptionId, Reason]),
3003+
response(Transport,
3004+
Connection,
3005+
subscribe,
3006+
CorrelationId,
3007+
?RESPONSE_CODE_PRECONDITION_FAILED),
3008+
increase_protocol_counter(?PRECONDITION_FAILED),
3009+
{Connection, State}
3010+
end.
3011+
29983012
maybe_register_consumer(_, _, _, _, _, _, false = _Sac) ->
2999-
true;
3013+
{ok, true};
30003014
maybe_register_consumer(VirtualHost,
30013015
Stream,
30023016
ConsumerName,
@@ -3005,15 +3019,13 @@ maybe_register_consumer(VirtualHost,
30053019
Properties,
30063020
true) ->
30073021
PartitionIndex = partition_index(VirtualHost, Stream, Properties),
3008-
{ok, Active} =
3009-
rabbit_stream_sac_coordinator:register_consumer(VirtualHost,
3010-
Stream,
3011-
PartitionIndex,
3012-
ConsumerName,
3013-
self(),
3014-
ConnectionName,
3015-
SubscriptionId),
3016-
Active.
3022+
rabbit_stream_sac_coordinator:register_consumer(VirtualHost,
3023+
Stream,
3024+
PartitionIndex,
3025+
ConsumerName,
3026+
self(),
3027+
ConnectionName,
3028+
SubscriptionId).
30173029

30183030
maybe_send_consumer_update(Transport,
30193031
Connection = #stream_connection{

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ groups() ->
6868
test_publisher_with_too_long_reference_errors,
6969
test_consumer_with_too_long_reference_errors,
7070
subscribe_unsubscribe_should_create_events,
71-
test_stream_test_utils
71+
test_stream_test_utils,
72+
sac_subscription_with_partition_index_conflict_should_return_error
7273
]},
7374
%% Run `test_global_counters` on its own so the global metrics are
7475
%% initialised to 0 for each testcase
@@ -1069,6 +1070,52 @@ test_stream_test_utils(Config) ->
10691070
{ok, _} = stream_test_utils:close(S, C5),
10701071
ok.
10711072

1073+
sac_subscription_with_partition_index_conflict_should_return_error(Config) ->
1074+
T = gen_tcp,
1075+
App = <<"app-1">>,
1076+
{ok, S, C0} = stream_test_utils:connect(Config, 0),
1077+
Ss = atom_to_binary(?FUNCTION_NAME, utf8),
1078+
Partition = unicode:characters_to_binary([Ss, <<"-0">>]),
1079+
SsCreationFrame = request({create_super_stream, Ss, [Partition], [<<"0">>], #{}}),
1080+
ok = T:send(S, SsCreationFrame),
1081+
{Cmd1, C1} = receive_commands(T, S, C0),
1082+
?assertMatch({response, 1, {create_super_stream, ?RESPONSE_CODE_OK}},
1083+
Cmd1),
1084+
1085+
SacSubscribeFrame = request({subscribe, 0, Partition,
1086+
first, 1,
1087+
#{<<"single-active-consumer">> => <<"true">>,
1088+
<<"name">> => App}}),
1089+
ok = T:send(S, SacSubscribeFrame),
1090+
{Cmd2, C2} = receive_commands(T, S, C1),
1091+
?assertMatch({response, 1, {subscribe, ?RESPONSE_CODE_OK}},
1092+
Cmd2),
1093+
{Cmd3, C3} = receive_commands(T, S, C2),
1094+
?assertMatch({request,0,{consumer_update,0,true}},
1095+
Cmd3),
1096+
1097+
SsSubscribeFrame = request({subscribe, 1, Partition,
1098+
first, 1,
1099+
#{<<"super-stream">> => Ss,
1100+
<<"single-active-consumer">> => <<"true">>,
1101+
<<"name">> => App}}),
1102+
ok = T:send(S, SsSubscribeFrame),
1103+
{Cmd4, C4} = receive_commands(T, S, C3),
1104+
?assertMatch({response, 1, {subscribe, ?RESPONSE_CODE_PRECONDITION_FAILED}},
1105+
Cmd4),
1106+
1107+
{ok, C5} = stream_test_utils:unsubscribe(S, C4, 0),
1108+
1109+
SsDeletionFrame = request({delete_super_stream, Ss}),
1110+
ok = T:send(S, SsDeletionFrame),
1111+
{Cmd5, C5} = receive_commands(T, S, C5),
1112+
?assertMatch({response, 1, {delete_super_stream, ?RESPONSE_CODE_OK}},
1113+
Cmd5),
1114+
1115+
{ok, _} = stream_test_utils:close(S, C5),
1116+
ok.
1117+
1118+
10721119
filtered_events(Config, EventType) ->
10731120
Events = rabbit_ct_broker_helpers:rpc(Config, 0,
10741121
gen_event,

0 commit comments

Comments
 (0)