Skip to content

Make empty CQ init faster in case of clean shutdown #13856

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions deps/rabbit/src/rabbit_classic_queue_index_v2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
%% queue implementation itself.
-export([pre_publish/7, flush_pre_publish_cache/2,
sync/1, needs_sync/1, flush/1,
bounds/1, next_segment_boundary/1]).
bounds/2, next_segment_boundary/1]).

%% Only used by tests
-export([bounds/1]).
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only backing_queue_SUITE:bq_queue_index test case uses bounds/1. If I understand correctly this test case tests the index module itself. I kept bounds/1 as the v1 index also has a function with the same signature (although that is not tested any more by backing_queue_SUITE and it will go away eventually) Maybe bq_queue_index should be modified to test bounds/2 instead, sometimes the NextSeqIdHint being undefined and sometimes an integer?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gomoripeti that sounds reasonable to me. Let's do that in a follow-up PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.


%% Used to upgrade/downgrade from/to the v1 index.
-export([init_for_conversion/3]).
Expand Down Expand Up @@ -480,7 +483,7 @@ recover_index_v1_common(State0 = #qi{ queue_name = Name, dir = DirBin },
%% When resuming after a crash we need to double check the messages that are both
%% in the v1 and v2 index (effectively the messages below the upper bound of the
%% v1 index that are about to be written to it).
{_, V2HiSeqId, _} = bounds(State0),
{_, V2HiSeqId, _} = bounds(State0, undefined),
SkipFun = fun
(SeqId, FunState0) when SeqId < V2HiSeqId ->
case read(SeqId, SeqId + 1, FunState0) of
Expand Down Expand Up @@ -1188,14 +1191,22 @@ flush_pre_publish_cache(TargetRamCount, State) ->
%% the test suite to pass. This can probably be made more accurate
%% in the future.

%% `bounds/1` is only used by tests
-spec bounds(State) ->
{non_neg_integer(), non_neg_integer(), State}
when State::state().
bounds(State) ->
bounds(State, undefined).

bounds(State = #qi{ segments = Segments }) ->
-spec bounds(State, non_neg_integer() | undefined) ->
{non_neg_integer(), non_neg_integer(), State}
when State::state().
bounds(State = #qi{ segments = Segments }, NextSeqIdHint) ->
?DEBUG("~0p", [State]),
%% We must special case when we are empty to make tests happy.
if
Segments =:= #{} andalso is_integer(NextSeqIdHint) ->
{NextSeqIdHint, NextSeqIdHint, State};
Segments =:= #{} ->
{0, 0, State};
true ->
Expand Down
8 changes: 7 additions & 1 deletion deps/rabbit/src/rabbit_variable_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1172,7 +1172,13 @@ expand_delta(_SeqId, #delta { count = Count,

init(IsDurable, IndexState, StoreState, DeltaCount, DeltaBytes, Terms,
PersistentClient, TransientClient, VHost) ->
{LowSeqId, HiSeqId, IndexState1} = rabbit_classic_queue_index_v2:bounds(IndexState),
NextSeqIdHint =
case Terms of
non_clean_shutdown -> undefined;
_ -> proplists:get_value(next_seq_id, Terms)
end,

{LowSeqId, HiSeqId, IndexState1} = rabbit_classic_queue_index_v2:bounds(IndexState, NextSeqIdHint),

{NextSeqId, NextDeliverSeqId, DeltaCount1, DeltaBytes1} =
case Terms of
Expand Down
40 changes: 40 additions & 0 deletions deps/rabbit/test/backing_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
variable_queue_dropfetchwhile,
variable_queue_dropwhile_restart,
variable_queue_dropwhile_sync_restart,
variable_queue_restart_large_seq_id,
variable_queue_ack_limiting,
variable_queue_purge,
variable_queue_requeue,
Expand Down Expand Up @@ -1421,6 +1422,45 @@ variable_queue_dropwhile_sync_restart2(VQ0, QName) ->

VQ5.

variable_queue_restart_large_seq_id(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, variable_queue_restart_large_seq_id1, [Config]).

variable_queue_restart_large_seq_id1(Config) ->
with_fresh_variable_queue(
fun variable_queue_restart_large_seq_id2/2,
?config(variable_queue_type, Config)).

variable_queue_restart_large_seq_id2(VQ0, QName) ->
Count = 1,

%% publish and consume a message
VQ1 = publish_fetch_and_ack(Count, 0, VQ0),
%% should be empty now
true = rabbit_variable_queue:is_empty(VQ1),

_VQ2 = rabbit_variable_queue:terminate(shutdown, VQ1),
Terms = variable_queue_read_terms(QName),
Count = proplists:get_value(next_seq_id, Terms),

%% set a very high next_seq_id as if 100M messages have been
%% published and consumed
Terms2 = lists:keyreplace(next_seq_id, 1, Terms, {next_seq_id, 100_000_000}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably test that the bounds returned by the index are correct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will work on a follow-up PR to update backing_queue_SUITE:bq_queue_index to test bounds/2 when there are messages in the queue. But what is a correct index range estimate for an empty queue? All bounds are correct overestimations. Maybe one property that can be checked that both LowSeqId and HighSeqId are '=< NextSeqId`?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only care about v2 so Low = High = Next?


{TInit, VQ3} =
timer:tc(
fun() -> variable_queue_init(test_amqqueue(QName, true), Terms2) end,
millisecond),
%% even with a very high next_seq_id start of an empty queue
%% should be quick (few milliseconds, but let's give it 100ms, to
%% avoid flaking on slow servers)
{true, _} = {TInit < 100, TInit},

%% should be empty now
true = rabbit_variable_queue:is_empty(VQ3),

VQ3.

variable_queue_ack_limiting(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, variable_queue_ack_limiting1, [Config]).
Expand Down
Loading