Skip to content

Commit b804543

Browse files
Merge pull request #13870 from rabbitmq/mergify/bp/v4.1.x/pr-13856
Make empty CQ init faster in case of clean shutdown (backport #13856)
2 parents cabe85a + e6cbf50 commit b804543

File tree

3 files changed

+61
-4
lines changed

3 files changed

+61
-4
lines changed

deps/rabbit/src/rabbit_classic_queue_index_v2.erl

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020
%% queue implementation itself.
2121
-export([pre_publish/7, flush_pre_publish_cache/2,
2222
sync/1, needs_sync/1, flush/1,
23-
bounds/1, next_segment_boundary/1]).
23+
bounds/2, next_segment_boundary/1]).
24+
25+
%% Only used by tests
26+
-export([bounds/1]).
2427

2528
%% Used to upgrade/downgrade from/to the v1 index.
2629
-export([init_for_conversion/3]).
@@ -480,7 +483,7 @@ recover_index_v1_common(State0 = #qi{ queue_name = Name, dir = DirBin },
480483
%% When resuming after a crash we need to double check the messages that are both
481484
%% in the v1 and v2 index (effectively the messages below the upper bound of the
482485
%% v1 index that are about to be written to it).
483-
{_, V2HiSeqId, _} = bounds(State0),
486+
{_, V2HiSeqId, _} = bounds(State0, undefined),
484487
SkipFun = fun
485488
(SeqId, FunState0) when SeqId < V2HiSeqId ->
486489
case read(SeqId, SeqId + 1, FunState0) of
@@ -1188,14 +1191,22 @@ flush_pre_publish_cache(TargetRamCount, State) ->
11881191
%% the test suite to pass. This can probably be made more accurate
11891192
%% in the future.
11901193

1194+
%% `bounds/1` is only used by tests
11911195
-spec bounds(State) ->
11921196
{non_neg_integer(), non_neg_integer(), State}
11931197
when State::state().
1198+
bounds(State) ->
1199+
bounds(State, undefined).
11941200

1195-
bounds(State = #qi{ segments = Segments }) ->
1201+
-spec bounds(State, non_neg_integer() | undefined) ->
1202+
{non_neg_integer(), non_neg_integer(), State}
1203+
when State::state().
1204+
bounds(State = #qi{ segments = Segments }, NextSeqIdHint) ->
11961205
?DEBUG("~0p", [State]),
11971206
%% We must special case when we are empty to make tests happy.
11981207
if
1208+
Segments =:= #{} andalso is_integer(NextSeqIdHint) ->
1209+
{NextSeqIdHint, NextSeqIdHint, State};
11991210
Segments =:= #{} ->
12001211
{0, 0, State};
12011212
true ->

deps/rabbit/src/rabbit_variable_queue.erl

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1172,7 +1172,13 @@ expand_delta(_SeqId, #delta { count = Count,
11721172

11731173
init(IsDurable, IndexState, StoreState, DeltaCount, DeltaBytes, Terms,
11741174
PersistentClient, TransientClient, VHost) ->
1175-
{LowSeqId, HiSeqId, IndexState1} = rabbit_classic_queue_index_v2:bounds(IndexState),
1175+
NextSeqIdHint =
1176+
case Terms of
1177+
non_clean_shutdown -> undefined;
1178+
_ -> proplists:get_value(next_seq_id, Terms)
1179+
end,
1180+
1181+
{LowSeqId, HiSeqId, IndexState1} = rabbit_classic_queue_index_v2:bounds(IndexState, NextSeqIdHint),
11761182

11771183
{NextSeqId, NextDeliverSeqId, DeltaCount1, DeltaBytes1} =
11781184
case Terms of

deps/rabbit/test/backing_queue_SUITE.erl

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
variable_queue_dropfetchwhile,
3030
variable_queue_dropwhile_restart,
3131
variable_queue_dropwhile_sync_restart,
32+
variable_queue_restart_large_seq_id,
3233
variable_queue_ack_limiting,
3334
variable_queue_purge,
3435
variable_queue_requeue,
@@ -1421,6 +1422,45 @@ variable_queue_dropwhile_sync_restart2(VQ0, QName) ->
14211422

14221423
VQ5.
14231424

1425+
variable_queue_restart_large_seq_id(Config) ->
1426+
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1427+
?MODULE, variable_queue_restart_large_seq_id1, [Config]).
1428+
1429+
variable_queue_restart_large_seq_id1(Config) ->
1430+
with_fresh_variable_queue(
1431+
fun variable_queue_restart_large_seq_id2/2,
1432+
?config(variable_queue_type, Config)).
1433+
1434+
variable_queue_restart_large_seq_id2(VQ0, QName) ->
1435+
Count = 1,
1436+
1437+
%% publish and consume a message
1438+
VQ1 = publish_fetch_and_ack(Count, 0, VQ0),
1439+
%% should be empty now
1440+
true = rabbit_variable_queue:is_empty(VQ1),
1441+
1442+
_VQ2 = rabbit_variable_queue:terminate(shutdown, VQ1),
1443+
Terms = variable_queue_read_terms(QName),
1444+
Count = proplists:get_value(next_seq_id, Terms),
1445+
1446+
%% set a very high next_seq_id as if 100M messages have been
1447+
%% published and consumed
1448+
Terms2 = lists:keyreplace(next_seq_id, 1, Terms, {next_seq_id, 100_000_000}),
1449+
1450+
{TInit, VQ3} =
1451+
timer:tc(
1452+
fun() -> variable_queue_init(test_amqqueue(QName, true), Terms2) end,
1453+
millisecond),
1454+
%% even with a very high next_seq_id start of an empty queue
1455+
%% should be quick (few milliseconds, but let's give it 100ms, to
1456+
%% avoid flaking on slow servers)
1457+
{true, _} = {TInit < 100, TInit},
1458+
1459+
%% should be empty now
1460+
true = rabbit_variable_queue:is_empty(VQ3),
1461+
1462+
VQ3.
1463+
14241464
variable_queue_ack_limiting(Config) ->
14251465
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
14261466
?MODULE, variable_queue_ack_limiting1, [Config]).

0 commit comments

Comments
 (0)