From 150172f008788a6198c67562867ba8e6efa7b2e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20G=C3=B6m=C3=B6ri?= Date: Wed, 30 Apr 2025 18:22:43 +0200 Subject: [PATCH] Make empty CQ init faster in case of clean shutdown At CQ startup variable_queue went through each seqid from 0 to next_seq_id looking for the first message even if there were no messages in the queue (no segment files). In case of a clean shutdown the value next_seq_id is stored in recovery terms. This value can be utilized by the queue index to provide better seqid bounds in absence of segment files. Before this patch starting an empty classic queue with next_seq_id = 100_000_000 used to take about 26 seconds. With this patch it takes less than 1ms. --- .../src/rabbit_classic_queue_index_v2.erl | 17 ++++++-- deps/rabbit/src/rabbit_variable_queue.erl | 8 +++- deps/rabbit/test/backing_queue_SUITE.erl | 40 +++++++++++++++++++ 3 files changed, 61 insertions(+), 4 deletions(-) diff --git a/deps/rabbit/src/rabbit_classic_queue_index_v2.erl b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl index 2117dc37a6cf..70c2579dcf30 100644 --- a/deps/rabbit/src/rabbit_classic_queue_index_v2.erl +++ b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl @@ -20,7 +20,10 @@ %% queue implementation itself. -export([pre_publish/7, flush_pre_publish_cache/2, sync/1, needs_sync/1, flush/1, - bounds/1, next_segment_boundary/1]). + bounds/2, next_segment_boundary/1]). + +%% Only used by tests +-export([bounds/1]). %% Used to upgrade/downgrade from/to the v1 index. -export([init_for_conversion/3]). @@ -480,7 +483,7 @@ recover_index_v1_common(State0 = #qi{ queue_name = Name, dir = DirBin }, %% When resuming after a crash we need to double check the messages that are both %% in the v1 and v2 index (effectively the messages below the upper bound of the %% v1 index that are about to be written to it). - {_, V2HiSeqId, _} = bounds(State0), + {_, V2HiSeqId, _} = bounds(State0, undefined), SkipFun = fun (SeqId, FunState0) when SeqId < V2HiSeqId -> case read(SeqId, SeqId + 1, FunState0) of @@ -1188,14 +1191,22 @@ flush_pre_publish_cache(TargetRamCount, State) -> %% the test suite to pass. This can probably be made more accurate %% in the future. +%% `bounds/1` is only used by tests -spec bounds(State) -> {non_neg_integer(), non_neg_integer(), State} when State::state(). +bounds(State) -> + bounds(State, undefined). -bounds(State = #qi{ segments = Segments }) -> +-spec bounds(State, non_neg_integer() | undefined) -> + {non_neg_integer(), non_neg_integer(), State} + when State::state(). +bounds(State = #qi{ segments = Segments }, NextSeqIdHint) -> ?DEBUG("~0p", [State]), %% We must special case when we are empty to make tests happy. if + Segments =:= #{} andalso is_integer(NextSeqIdHint) -> + {NextSeqIdHint, NextSeqIdHint, State}; Segments =:= #{} -> {0, 0, State}; true -> diff --git a/deps/rabbit/src/rabbit_variable_queue.erl b/deps/rabbit/src/rabbit_variable_queue.erl index 4f23dbf8f92a..2ffca81a3d1c 100644 --- a/deps/rabbit/src/rabbit_variable_queue.erl +++ b/deps/rabbit/src/rabbit_variable_queue.erl @@ -1172,7 +1172,13 @@ expand_delta(_SeqId, #delta { count = Count, init(IsDurable, IndexState, StoreState, DeltaCount, DeltaBytes, Terms, PersistentClient, TransientClient, VHost) -> - {LowSeqId, HiSeqId, IndexState1} = rabbit_classic_queue_index_v2:bounds(IndexState), + NextSeqIdHint = + case Terms of + non_clean_shutdown -> undefined; + _ -> proplists:get_value(next_seq_id, Terms) + end, + + {LowSeqId, HiSeqId, IndexState1} = rabbit_classic_queue_index_v2:bounds(IndexState, NextSeqIdHint), {NextSeqId, NextDeliverSeqId, DeltaCount1, DeltaBytes1} = case Terms of diff --git a/deps/rabbit/test/backing_queue_SUITE.erl b/deps/rabbit/test/backing_queue_SUITE.erl index 035644296754..adda1cdf8b41 100644 --- a/deps/rabbit/test/backing_queue_SUITE.erl +++ b/deps/rabbit/test/backing_queue_SUITE.erl @@ -29,6 +29,7 @@ variable_queue_dropfetchwhile, variable_queue_dropwhile_restart, variable_queue_dropwhile_sync_restart, + variable_queue_restart_large_seq_id, variable_queue_ack_limiting, variable_queue_purge, variable_queue_requeue, @@ -1421,6 +1422,45 @@ variable_queue_dropwhile_sync_restart2(VQ0, QName) -> VQ5. +variable_queue_restart_large_seq_id(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, variable_queue_restart_large_seq_id1, [Config]). + +variable_queue_restart_large_seq_id1(Config) -> + with_fresh_variable_queue( + fun variable_queue_restart_large_seq_id2/2, + ?config(variable_queue_type, Config)). + +variable_queue_restart_large_seq_id2(VQ0, QName) -> + Count = 1, + + %% publish and consume a message + VQ1 = publish_fetch_and_ack(Count, 0, VQ0), + %% should be empty now + true = rabbit_variable_queue:is_empty(VQ1), + + _VQ2 = rabbit_variable_queue:terminate(shutdown, VQ1), + Terms = variable_queue_read_terms(QName), + Count = proplists:get_value(next_seq_id, Terms), + + %% set a very high next_seq_id as if 100M messages have been + %% published and consumed + Terms2 = lists:keyreplace(next_seq_id, 1, Terms, {next_seq_id, 100_000_000}), + + {TInit, VQ3} = + timer:tc( + fun() -> variable_queue_init(test_amqqueue(QName, true), Terms2) end, + millisecond), + %% even with a very high next_seq_id start of an empty queue + %% should be quick (few milliseconds, but let's give it 100ms, to + %% avoid flaking on slow servers) + {true, _} = {TInit < 100, TInit}, + + %% should be empty now + true = rabbit_variable_queue:is_empty(VQ3), + + VQ3. + variable_queue_ack_limiting(Config) -> passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, variable_queue_ack_limiting1, [Config]).