Skip to content

Commit 56964a8

Browse files
authored
Merge pull request #12074 from rabbitmq/issue-11915
Cancel AMQP stream consumer when local stream member is deleted
2 parents f0932e3 + 0061944 commit 56964a8

File tree

4 files changed

+52
-2
lines changed

4 files changed

+52
-2
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -823,7 +823,7 @@ rabbitmq_integration_suite(
823823
additional_beam = [
824824
":test_queue_utils_beam",
825825
],
826-
shard_count = 19,
826+
shard_count = 20,
827827
deps = [
828828
"@proper//:erlang_app",
829829
],

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1747,6 +1747,12 @@ eval_listener({P, member}, {ListNode, ListMPid0}, {Lsts0, Effs0},
17471747
{queue_event, QRef,
17481748
{stream_local_member_change, MemberPid}},
17491749
cast} | Efs]};
1750+
(_MNode, #member{state = {running, _, MemberPid},
1751+
role = {replica, _},
1752+
target = deleted}, {_, Efs}) ->
1753+
{MemberPid, [{send_msg, P,
1754+
{queue_event, QRef, deleted_replica},
1755+
cast} | Efs]};
17501756
(_N, _M, Acc) ->
17511757
%% not a replica, nothing to do
17521758
Acc

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -626,7 +626,9 @@ handle_event(_QName, {stream_local_member_change, Pid},
626626
end, #{}, Readers0),
627627
{ok, State#stream_client{local_pid = Pid, readers = Readers1}, []};
628628
handle_event(_QName, eol, #stream_client{name = Name}) ->
629-
{eol, [{unblock, Name}]}.
629+
{eol, [{unblock, Name}]};
630+
handle_event(QName, deleted_replica, State) ->
631+
{ok, State, [{queue_down, QName}]}.
630632

631633
is_recoverable(Q) ->
632634
Node = node(),

deps/rabbit/test/rabbit_stream_queue_SUITE.erl

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ all() ->
3434
{group, cluster_size_3},
3535
{group, cluster_size_3_1},
3636
{group, cluster_size_3_2},
37+
{group, cluster_size_3_3},
3738
{group, cluster_size_3_parallel_1},
3839
{group, cluster_size_3_parallel_2},
3940
{group, cluster_size_3_parallel_3},
@@ -79,6 +80,7 @@ groups() ->
7980
{cluster_size_3_2, [], [recover,
8081
declare_with_node_down_1,
8182
declare_with_node_down_2]},
83+
{cluster_size_3_3, [], [consume_while_deleting_replica]},
8284
{cluster_size_3_parallel_1, [parallel], [
8385
delete_replica,
8486
delete_last_replica,
@@ -207,6 +209,7 @@ init_per_group1(Group, Config) ->
207209
cluster_size_3_parallel_5 -> 3;
208210
cluster_size_3_1 -> 3;
209211
cluster_size_3_2 -> 3;
212+
cluster_size_3_3 -> 3;
210213
unclustered_size_3_1 -> 3;
211214
unclustered_size_3_2 -> 3;
212215
unclustered_size_3_3 -> 3;
@@ -1648,6 +1651,45 @@ consume_from_replica(Config) ->
16481651
receive_batch(Ch2, 0, 99),
16491652
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
16501653

1654+
consume_while_deleting_replica(Config) ->
1655+
[Server1, _, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1656+
1657+
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1),
1658+
Q = ?config(queue_name, Config),
1659+
1660+
?assertEqual({'queue.declare_ok', Q, 0, 0},
1661+
declare(Config, Server1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
1662+
1663+
rabbit_ct_helpers:await_condition(
1664+
fun () ->
1665+
Info = find_queue_info(Config, 1, [online]),
1666+
length(proplists:get_value(online, Info)) == 3
1667+
end),
1668+
1669+
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server3),
1670+
qos(Ch2, 10, false),
1671+
1672+
CTag = atom_to_binary(?FUNCTION_NAME),
1673+
subscribe(Ch2, Q, false, 0, CTag),
1674+
1675+
%% Delete replica in node 3
1676+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_stream_queue,
1677+
delete_replica, [<<"/">>, Q, Server3]),
1678+
1679+
publish_confirm(Ch1, Q, [<<"msg1">> || _ <- lists:seq(1, 100)]),
1680+
1681+
%% no messages should be received
1682+
receive
1683+
#'basic.cancel'{consumer_tag = CTag} ->
1684+
ok;
1685+
{_, #amqp_msg{}} ->
1686+
exit(unexpected_message)
1687+
after 30000 ->
1688+
exit(missing_consumer_cancel)
1689+
end,
1690+
1691+
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
1692+
16511693
consume_credit(Config) ->
16521694
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
16531695

0 commit comments

Comments
 (0)