@@ -932,7 +932,7 @@ which_module(5) -> ?MODULE.
932
932
smallest_index :: undefined | ra :index (),
933
933
messages_total :: non_neg_integer (),
934
934
indexes = ? CHECK_MIN_INDEXES :: non_neg_integer (),
935
- unused_1 = ? NIL }).
935
+ bytes_in = 0 :: non_neg_integer () }).
936
936
-record (aux_gc , {last_raft_idx = 0 :: ra :index ()}).
937
937
-record (aux , {name :: atom (),
938
938
capacity :: term (),
@@ -943,7 +943,9 @@ which_module(5) -> ?MODULE.
943
943
gc = # aux_gc {} :: # aux_gc {},
944
944
tick_pid :: undefined | pid (),
945
945
cache = #{} :: map (),
946
- last_checkpoint :: # checkpoint {}}).
946
+ last_checkpoint :: # checkpoint {},
947
+ bytes_in = 0 :: non_neg_integer (),
948
+ bytes_out = 0 :: non_neg_integer ()}).
947
949
948
950
init_aux (Name ) when is_atom (Name ) ->
949
951
% % TODO: catch specific exception throw if table already exists
@@ -956,7 +958,7 @@ init_aux(Name) when is_atom(Name) ->
956
958
last_checkpoint = # checkpoint {index = 0 ,
957
959
timestamp = erlang :system_time (millisecond ),
958
960
messages_total = 0 ,
959
- unused_1 = ? NIL }}.
961
+ bytes_in = 0 }}.
960
962
961
963
handle_aux (RaftState , Tag , Cmd , # aux {name = Name ,
962
964
capacity = Cap ,
@@ -973,13 +975,14 @@ handle_aux(RaftState, Tag, Cmd, AuxV2, RaAux)
973
975
handle_aux (RaftState , Tag , Cmd , AuxV3 , RaAux );
974
976
handle_aux (leader , cast , eval ,
975
977
#? AUX {last_decorators_state = LastDec ,
978
+ bytes_in = BytesIn ,
976
979
last_checkpoint = Check0 } = Aux0 ,
977
980
RaAux ) ->
978
981
#? STATE {cfg = # cfg {resource = QName }} = MacState =
979
982
ra_aux :machine_state (RaAux ),
980
983
981
984
Ts = erlang :system_time (millisecond ),
982
- {Check , Effects0 } = do_checkpoints (Ts , Check0 , RaAux , false ),
985
+ {Check , Effects0 } = do_checkpoints (Ts , Check0 , RaAux , BytesIn , false ),
983
986
984
987
% % this is called after each batch of commands have been applied
985
988
% % set timer for message expire
@@ -995,11 +998,16 @@ handle_aux(leader, cast, eval,
995
998
last_decorators_state = NewLast }, RaAux , Effects }
996
999
end ;
997
1000
handle_aux (_RaftState , cast , eval ,
998
- #? AUX {last_checkpoint = Check0 } = Aux0 ,
1001
+ #? AUX {last_checkpoint = Check0 ,
1002
+ bytes_in = BytesIn } = Aux0 ,
999
1003
RaAux ) ->
1000
1004
Ts = erlang :system_time (millisecond ),
1001
- {Check , Effects } = do_checkpoints (Ts , Check0 , RaAux , false ),
1005
+ {Check , Effects } = do_checkpoints (Ts , Check0 , RaAux , BytesIn , false ),
1002
1006
{no_reply , Aux0 #? AUX {last_checkpoint = Check }, RaAux , Effects };
1007
+ handle_aux (_RaftState , cast , {bytes_in , {MetaSize , BodySize }},
1008
+ #? AUX {bytes_in = Bytes } = Aux0 ,
1009
+ RaAux ) ->
1010
+ {no_reply , Aux0 #? AUX {bytes_in = Bytes + MetaSize + BodySize }, RaAux , []};
1003
1011
handle_aux (_RaftState , cast , {# return {msg_ids = MsgIds ,
1004
1012
consumer_key = Key } = Ret , Corr , Pid },
1005
1013
Aux0 , RaAux0 ) ->
@@ -1129,12 +1137,13 @@ handle_aux(_RaState, {call, _From}, {peek, Pos}, Aux0,
1129
1137
handle_aux (_ , _ , garbage_collection , Aux , RaAux ) ->
1130
1138
{no_reply , force_eval_gc (RaAux , Aux ), RaAux };
1131
1139
handle_aux (_RaState , _ , force_checkpoint ,
1132
- #? AUX {last_checkpoint = Check0 } = Aux , RaAux ) ->
1140
+ #? AUX {last_checkpoint = Check0 ,
1141
+ bytes_in = BytesIn } = Aux , RaAux ) ->
1133
1142
Ts = erlang :system_time (millisecond ),
1134
1143
#? STATE {cfg = # cfg {resource = QR }} = ra_aux :machine_state (RaAux ),
1135
1144
rabbit_log :debug (" ~ts : rabbit_fifo: forcing checkpoint at ~b " ,
1136
1145
[rabbit_misc :rs (QR ), ra_aux :last_applied (RaAux )]),
1137
- {Check , Effects } = do_checkpoints (Ts , Check0 , RaAux , true ),
1146
+ {Check , Effects } = do_checkpoints (Ts , Check0 , RaAux , BytesIn , true ),
1138
1147
{no_reply , Aux #? AUX {last_checkpoint = Check }, RaAux , Effects };
1139
1148
handle_aux (RaState , _ , {dlx , _ } = Cmd , Aux0 , RaAux ) ->
1140
1149
#? STATE {dlx = DlxState ,
@@ -1578,7 +1587,9 @@ maybe_return_all(#{system_time := Ts} = Meta, ConsumerKey,
1578
1587
apply_enqueue (#{index := RaftIdx ,
1579
1588
system_time := Ts } = Meta , From ,
1580
1589
Seq , RawMsg , Size , State0 ) ->
1581
- case maybe_enqueue (RaftIdx , Ts , From , Seq , RawMsg , Size , [], State0 ) of
1590
+ Effects0 = [{aux , {bytes_in , Size }}],
1591
+ case maybe_enqueue (RaftIdx , Ts , From , Seq , RawMsg , Size ,
1592
+ Effects0 , State0 ) of
1582
1593
{ok , State1 , Effects1 } ->
1583
1594
checkout (Meta , State0 , State1 , Effects1 );
1584
1595
{out_of_sequence , State , Effects } ->
@@ -2918,11 +2929,12 @@ priority_tag(Msg) ->
2918
2929
end .
2919
2930
2920
2931
2921
- do_checkpoints (Ts ,
2922
- # checkpoint {index = ChIdx ,
2923
- timestamp = ChTime ,
2924
- smallest_index = LastSmallest ,
2925
- indexes = MinIndexes } = Check0 , RaAux , Force ) ->
2932
+ do_checkpoints (Ts , # checkpoint {index = ChIdx ,
2933
+ timestamp = ChTime ,
2934
+ smallest_index = LastSmallest ,
2935
+ bytes_in = LastBytesIn ,
2936
+ indexes = MinIndexes } = Check0 ,
2937
+ RaAux , BytesIn , Force ) ->
2926
2938
LastAppliedIdx = ra_aux :last_applied (RaAux ),
2927
2939
IndexesSince = LastAppliedIdx - ChIdx ,
2928
2940
#? STATE {} = MacState = ra_aux :machine_state (RaAux ),
@@ -2934,21 +2946,35 @@ do_checkpoints(Ts,
2934
2946
Smallest
2935
2947
end ,
2936
2948
MsgsTot = messages_total (MacState ),
2949
+ % % more than 64MB (by default) of message data has been written to the log
2950
+ % % best take a checkpoint
2951
+
2937
2952
{CheckMinInterval , CheckMinIndexes , CheckMaxIndexes } =
2938
2953
persistent_term :get (quorum_queue_checkpoint_config ,
2939
2954
{? CHECK_MIN_INTERVAL_MS , ? CHECK_MIN_INDEXES ,
2940
2955
? CHECK_MAX_INDEXES }),
2956
+
2957
+ % % scale the bytes limit as the backlog increases
2958
+ MaxBytesFactor = max (1 , MsgsTot / CheckMaxIndexes ),
2959
+ EnoughDataWritten = BytesIn - LastBytesIn > (? CHECK_MAX_BYTES * MaxBytesFactor ),
2941
2960
EnoughTimeHasPassed = TimeSince > CheckMinInterval ,
2942
2961
2943
- % % enough time has passed and enough indexes have been committed
2944
- case (IndexesSince > MinIndexes andalso
2945
- EnoughTimeHasPassed ) orelse
2946
- % % the queue is empty and some commands have been
2947
- % % applied since the last checkpoint
2948
- (MsgsTot == 0 andalso
2949
- IndexesSince > CheckMinIndexes andalso
2950
- EnoughTimeHasPassed ) orelse
2951
- Force of
2962
+ case (EnoughTimeHasPassed andalso
2963
+ (
2964
+ % % condition 1: enough indexes have been committed since the last
2965
+ % % checkpoint
2966
+ (IndexesSince > MinIndexes ) orelse
2967
+ % % condition 2: the queue is empty and _some_ commands
2968
+ % % have been applied since the last checkpoint
2969
+ (MsgsTot == 0 andalso IndexesSince > 32 )
2970
+ )
2971
+ ) orelse
2972
+ % % condition 3: enough message data has been written to warrant a new
2973
+ % % checkpoint, this ignores the time windowing
2974
+ EnoughDataWritten orelse
2975
+ % % force was requested, e.g. after a purge
2976
+ Force
2977
+ of
2952
2978
true ->
2953
2979
% % take fewer checkpoints the more messages there are on queue
2954
2980
NextIndexes = min (max (MsgsTot , CheckMinIndexes ), CheckMaxIndexes ),
@@ -2957,6 +2983,7 @@ do_checkpoints(Ts,
2957
2983
timestamp = Ts ,
2958
2984
smallest_index = NewSmallest ,
2959
2985
messages_total = MsgsTot ,
2986
+ bytes_in = BytesIn ,
2960
2987
indexes = NextIndexes },
2961
2988
[{checkpoint , LastAppliedIdx , MacState } |
2962
2989
release_cursor (LastSmallest , NewSmallest )]};
0 commit comments