Skip to content

Make empty CQ init faster in case of clean shutdown (backport #13856) #13870

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions deps/rabbit/src/rabbit_classic_queue_index_v2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
%% queue implementation itself.
-export([pre_publish/7, flush_pre_publish_cache/2,
sync/1, needs_sync/1, flush/1,
bounds/1, next_segment_boundary/1]).
bounds/2, next_segment_boundary/1]).

%% Only used by tests
-export([bounds/1]).

%% Used to upgrade/downgrade from/to the v1 index.
-export([init_for_conversion/3]).
Expand Down Expand Up @@ -480,7 +483,7 @@ recover_index_v1_common(State0 = #qi{ queue_name = Name, dir = DirBin },
%% When resuming after a crash we need to double check the messages that are both
%% in the v1 and v2 index (effectively the messages below the upper bound of the
%% v1 index that are about to be written to it).
{_, V2HiSeqId, _} = bounds(State0),
{_, V2HiSeqId, _} = bounds(State0, undefined),
SkipFun = fun
(SeqId, FunState0) when SeqId < V2HiSeqId ->
case read(SeqId, SeqId + 1, FunState0) of
Expand Down Expand Up @@ -1188,14 +1191,22 @@ flush_pre_publish_cache(TargetRamCount, State) ->
%% the test suite to pass. This can probably be made more accurate
%% in the future.

%% `bounds/1` is only used by tests
-spec bounds(State) ->
{non_neg_integer(), non_neg_integer(), State}
when State::state().
bounds(State) ->
bounds(State, undefined).

bounds(State = #qi{ segments = Segments }) ->
-spec bounds(State, non_neg_integer() | undefined) ->
{non_neg_integer(), non_neg_integer(), State}
when State::state().
bounds(State = #qi{ segments = Segments }, NextSeqIdHint) ->
?DEBUG("~0p", [State]),
%% We must special case when we are empty to make tests happy.
if
Segments =:= #{} andalso is_integer(NextSeqIdHint) ->
{NextSeqIdHint, NextSeqIdHint, State};
Segments =:= #{} ->
{0, 0, State};
true ->
Expand Down
8 changes: 7 additions & 1 deletion deps/rabbit/src/rabbit_variable_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1172,7 +1172,13 @@ expand_delta(_SeqId, #delta { count = Count,

init(IsDurable, IndexState, StoreState, DeltaCount, DeltaBytes, Terms,
PersistentClient, TransientClient, VHost) ->
{LowSeqId, HiSeqId, IndexState1} = rabbit_classic_queue_index_v2:bounds(IndexState),
NextSeqIdHint =
case Terms of
non_clean_shutdown -> undefined;
_ -> proplists:get_value(next_seq_id, Terms)
end,

{LowSeqId, HiSeqId, IndexState1} = rabbit_classic_queue_index_v2:bounds(IndexState, NextSeqIdHint),

{NextSeqId, NextDeliverSeqId, DeltaCount1, DeltaBytes1} =
case Terms of
Expand Down
40 changes: 40 additions & 0 deletions deps/rabbit/test/backing_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
variable_queue_dropfetchwhile,
variable_queue_dropwhile_restart,
variable_queue_dropwhile_sync_restart,
variable_queue_restart_large_seq_id,
variable_queue_ack_limiting,
variable_queue_purge,
variable_queue_requeue,
Expand Down Expand Up @@ -1421,6 +1422,45 @@ variable_queue_dropwhile_sync_restart2(VQ0, QName) ->

VQ5.

variable_queue_restart_large_seq_id(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, variable_queue_restart_large_seq_id1, [Config]).

variable_queue_restart_large_seq_id1(Config) ->
with_fresh_variable_queue(
fun variable_queue_restart_large_seq_id2/2,
?config(variable_queue_type, Config)).

variable_queue_restart_large_seq_id2(VQ0, QName) ->
Count = 1,

%% publish and consume a message
VQ1 = publish_fetch_and_ack(Count, 0, VQ0),
%% should be empty now
true = rabbit_variable_queue:is_empty(VQ1),

_VQ2 = rabbit_variable_queue:terminate(shutdown, VQ1),
Terms = variable_queue_read_terms(QName),
Count = proplists:get_value(next_seq_id, Terms),

%% set a very high next_seq_id as if 100M messages have been
%% published and consumed
Terms2 = lists:keyreplace(next_seq_id, 1, Terms, {next_seq_id, 100_000_000}),

{TInit, VQ3} =
timer:tc(
fun() -> variable_queue_init(test_amqqueue(QName, true), Terms2) end,
millisecond),
%% even with a very high next_seq_id start of an empty queue
%% should be quick (few milliseconds, but let's give it 100ms, to
%% avoid flaking on slow servers)
{true, _} = {TInit < 100, TInit},

%% should be empty now
true = rabbit_variable_queue:is_empty(VQ3),

VQ3.

variable_queue_ack_limiting(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, variable_queue_ack_limiting1, [Config]).
Expand Down
Loading