diff --git a/deps/rabbit/src/rabbit_classic_queue_index_v2.erl b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl index 2117dc37a6c..70c2579dcf3 100644 --- a/deps/rabbit/src/rabbit_classic_queue_index_v2.erl +++ b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl @@ -20,7 +20,10 @@ %% queue implementation itself. -export([pre_publish/7, flush_pre_publish_cache/2, sync/1, needs_sync/1, flush/1, - bounds/1, next_segment_boundary/1]). + bounds/2, next_segment_boundary/1]). + +%% Only used by tests +-export([bounds/1]). %% Used to upgrade/downgrade from/to the v1 index. -export([init_for_conversion/3]). @@ -480,7 +483,7 @@ recover_index_v1_common(State0 = #qi{ queue_name = Name, dir = DirBin }, %% When resuming after a crash we need to double check the messages that are both %% in the v1 and v2 index (effectively the messages below the upper bound of the %% v1 index that are about to be written to it). - {_, V2HiSeqId, _} = bounds(State0), + {_, V2HiSeqId, _} = bounds(State0, undefined), SkipFun = fun (SeqId, FunState0) when SeqId < V2HiSeqId -> case read(SeqId, SeqId + 1, FunState0) of @@ -1188,14 +1191,22 @@ flush_pre_publish_cache(TargetRamCount, State) -> %% the test suite to pass. This can probably be made more accurate %% in the future. +%% `bounds/1` is only used by tests -spec bounds(State) -> {non_neg_integer(), non_neg_integer(), State} when State::state(). +bounds(State) -> + bounds(State, undefined). -bounds(State = #qi{ segments = Segments }) -> +-spec bounds(State, non_neg_integer() | undefined) -> + {non_neg_integer(), non_neg_integer(), State} + when State::state(). +bounds(State = #qi{ segments = Segments }, NextSeqIdHint) -> ?DEBUG("~0p", [State]), %% We must special case when we are empty to make tests happy. if + Segments =:= #{} andalso is_integer(NextSeqIdHint) -> + {NextSeqIdHint, NextSeqIdHint, State}; Segments =:= #{} -> {0, 0, State}; true -> diff --git a/deps/rabbit/src/rabbit_variable_queue.erl b/deps/rabbit/src/rabbit_variable_queue.erl index 4f23dbf8f92..2ffca81a3d1 100644 --- a/deps/rabbit/src/rabbit_variable_queue.erl +++ b/deps/rabbit/src/rabbit_variable_queue.erl @@ -1172,7 +1172,13 @@ expand_delta(_SeqId, #delta { count = Count, init(IsDurable, IndexState, StoreState, DeltaCount, DeltaBytes, Terms, PersistentClient, TransientClient, VHost) -> - {LowSeqId, HiSeqId, IndexState1} = rabbit_classic_queue_index_v2:bounds(IndexState), + NextSeqIdHint = + case Terms of + non_clean_shutdown -> undefined; + _ -> proplists:get_value(next_seq_id, Terms) + end, + + {LowSeqId, HiSeqId, IndexState1} = rabbit_classic_queue_index_v2:bounds(IndexState, NextSeqIdHint), {NextSeqId, NextDeliverSeqId, DeltaCount1, DeltaBytes1} = case Terms of diff --git a/deps/rabbit/test/backing_queue_SUITE.erl b/deps/rabbit/test/backing_queue_SUITE.erl index 03564429675..adda1cdf8b4 100644 --- a/deps/rabbit/test/backing_queue_SUITE.erl +++ b/deps/rabbit/test/backing_queue_SUITE.erl @@ -29,6 +29,7 @@ variable_queue_dropfetchwhile, variable_queue_dropwhile_restart, variable_queue_dropwhile_sync_restart, + variable_queue_restart_large_seq_id, variable_queue_ack_limiting, variable_queue_purge, variable_queue_requeue, @@ -1421,6 +1422,45 @@ variable_queue_dropwhile_sync_restart2(VQ0, QName) -> VQ5. +variable_queue_restart_large_seq_id(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, variable_queue_restart_large_seq_id1, [Config]). + +variable_queue_restart_large_seq_id1(Config) -> + with_fresh_variable_queue( + fun variable_queue_restart_large_seq_id2/2, + ?config(variable_queue_type, Config)). + +variable_queue_restart_large_seq_id2(VQ0, QName) -> + Count = 1, + + %% publish and consume a message + VQ1 = publish_fetch_and_ack(Count, 0, VQ0), + %% should be empty now + true = rabbit_variable_queue:is_empty(VQ1), + + _VQ2 = rabbit_variable_queue:terminate(shutdown, VQ1), + Terms = variable_queue_read_terms(QName), + Count = proplists:get_value(next_seq_id, Terms), + + %% set a very high next_seq_id as if 100M messages have been + %% published and consumed + Terms2 = lists:keyreplace(next_seq_id, 1, Terms, {next_seq_id, 100_000_000}), + + {TInit, VQ3} = + timer:tc( + fun() -> variable_queue_init(test_amqqueue(QName, true), Terms2) end, + millisecond), + %% even with a very high next_seq_id start of an empty queue + %% should be quick (few milliseconds, but let's give it 100ms, to + %% avoid flaking on slow servers) + {true, _} = {TInit < 100, TInit}, + + %% should be empty now + true = rabbit_variable_queue:is_empty(VQ3), + + VQ3. + variable_queue_ack_limiting(Config) -> passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, variable_queue_ack_limiting1, [Config]).