Skip to content

Commit ea2e5d2

Browse files
committed
Make empty CQ init faster in case of clean shutdown
At CQ startup variable_queue went through each seqid from 0 to next_seq_id looking for the first message even if there were no messages in the queue (no segment files). In case of a clean shutdown the value next_seq_id is stored in recovery terms. This value can be utilized by the queue index to provide better seqid bounds in absence of segment files. Before this patch starting an empty classic queue with next_seq_id = 100_000_000 used to take about 26 seconds. With this patch it takes less than 1ms.
1 parent c458cba commit ea2e5d2

File tree

3 files changed

+61
-4
lines changed

3 files changed

+61
-4
lines changed

deps/rabbit/src/rabbit_classic_queue_index_v2.erl

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020
%% queue implementation itself.
2121
-export([pre_publish/7, flush_pre_publish_cache/2,
2222
sync/1, needs_sync/1, flush/1,
23-
bounds/1, next_segment_boundary/1]).
23+
bounds/2, next_segment_boundary/1]).
24+
25+
%% Only used by tests
26+
-export([bounds/1]).
2427

2528
%% Used to upgrade/downgrade from/to the v1 index.
2629
-export([init_for_conversion/3]).
@@ -480,7 +483,7 @@ recover_index_v1_common(State0 = #qi{ queue_name = Name, dir = DirBin },
480483
%% When resuming after a crash we need to double check the messages that are both
481484
%% in the v1 and v2 index (effectively the messages below the upper bound of the
482485
%% v1 index that are about to be written to it).
483-
{_, V2HiSeqId, _} = bounds(State0),
486+
{_, V2HiSeqId, _} = bounds(State0, undefined),
484487
SkipFun = fun
485488
(SeqId, FunState0) when SeqId < V2HiSeqId ->
486489
case read(SeqId, SeqId + 1, FunState0) of
@@ -1188,14 +1191,22 @@ flush_pre_publish_cache(TargetRamCount, State) ->
11881191
%% the test suite to pass. This can probably be made more accurate
11891192
%% in the future.
11901193

1194+
%% `bounds/1` is only used by tests
11911195
-spec bounds(State) ->
11921196
{non_neg_integer(), non_neg_integer(), State}
11931197
when State::state().
1198+
bounds(State) ->
1199+
bounds(State, undefined).
11941200

1195-
bounds(State = #qi{ segments = Segments }) ->
1201+
-spec bounds(State, non_neg_integer() | undefiend) ->
1202+
{non_neg_integer(), non_neg_integer(), State}
1203+
when State::state().
1204+
bounds(State = #qi{ segments = Segments }, NextSeqIdHint) ->
11961205
?DEBUG("~0p", [State]),
11971206
%% We must special case when we are empty to make tests happy.
11981207
if
1208+
Segments =:= #{} andalso is_integer(NextSeqIdHint) ->
1209+
{NextSeqIdHint, NextSeqIdHint, State};
11991210
Segments =:= #{} ->
12001211
{0, 0, State};
12011212
true ->

deps/rabbit/src/rabbit_variable_queue.erl

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1172,7 +1172,13 @@ expand_delta(_SeqId, #delta { count = Count,
11721172

11731173
init(IsDurable, IndexState, StoreState, DeltaCount, DeltaBytes, Terms,
11741174
PersistentClient, TransientClient, VHost) ->
1175-
{LowSeqId, HiSeqId, IndexState1} = rabbit_classic_queue_index_v2:bounds(IndexState),
1175+
NextSeqIdHint =
1176+
case Terms of
1177+
non_clean_shutdown -> undefined;
1178+
_ -> proplists:get_value(next_seq_id, Terms)
1179+
end,
1180+
1181+
{LowSeqId, HiSeqId, IndexState1} = rabbit_classic_queue_index_v2:bounds(IndexState, NextSeqIdHint),
11761182

11771183
{NextSeqId, NextDeliverSeqId, DeltaCount1, DeltaBytes1} =
11781184
case Terms of

deps/rabbit/test/backing_queue_SUITE.erl

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
variable_queue_dropfetchwhile,
3030
variable_queue_dropwhile_restart,
3131
variable_queue_dropwhile_sync_restart,
32+
variable_queue_restart_large_seq_id,
3233
variable_queue_ack_limiting,
3334
variable_queue_purge,
3435
variable_queue_requeue,
@@ -1421,6 +1422,45 @@ variable_queue_dropwhile_sync_restart2(VQ0, QName) ->
14211422

14221423
VQ5.
14231424

1425+
variable_queue_restart_large_seq_id(Config) ->
1426+
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1427+
?MODULE, variable_queue_restart_large_seq_id1, [Config]).
1428+
1429+
variable_queue_restart_large_seq_id1(Config) ->
1430+
with_fresh_variable_queue(
1431+
fun variable_queue_restart_large_seq_id2/2,
1432+
?config(variable_queue_type, Config)).
1433+
1434+
variable_queue_restart_large_seq_id2(VQ0, QName) ->
1435+
Count = 1,
1436+
1437+
%% publish and consume a message
1438+
VQ1 = publish_fetch_and_ack(Count, 0, VQ0),
1439+
%% should be empty now
1440+
true = rabbit_variable_queue:is_empty(VQ1),
1441+
1442+
_VQ2 = rabbit_variable_queue:terminate(shutdown, VQ1),
1443+
Terms = variable_queue_read_terms(QName),
1444+
Count = proplists:get_value(next_seq_id, Terms),
1445+
1446+
%% set a very high next_seq_id as if 100M messages have been
1447+
%% published and consumed
1448+
Terms2 = lists:keyreplace(next_seq_id, 1, Terms, {next_seq_id, 100_000_000}),
1449+
1450+
{TInit, VQ3} =
1451+
timer:tc(
1452+
fun() -> variable_queue_init(test_amqqueue(QName, true), Terms2) end,
1453+
millisecond),
1454+
%% even with a very high next_seq_id start of an empty queue
1455+
%% should be quick (few milliseconds, but let's give it 100ms, to
1456+
%% avoid flaking on slow servers)
1457+
{true, _} = {TInit < 100, TInit},
1458+
1459+
%% should be empty now
1460+
true = rabbit_variable_queue:is_empty(VQ3),
1461+
1462+
VQ3.
1463+
14241464
variable_queue_ack_limiting(Config) ->
14251465
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
14261466
?MODULE, variable_queue_ack_limiting1, [Config]).

0 commit comments

Comments
 (0)