@@ -64,6 +64,7 @@ all_tests() ->
64
64
scenario32 ,
65
65
upgrade ,
66
66
messages_total ,
67
+ ra_indexes ,
67
68
simple_prefetch ,
68
69
simple_prefetch_without_checkout_cancel ,
69
70
simple_prefetch_01 ,
@@ -910,6 +911,30 @@ messages_total(_Config) ->
910
911
end )
911
912
end , [], Size ).
912
913
914
+ ra_indexes (_Config ) ->
915
+ meck :expect (rabbit_feature_flags , is_enabled , fun (_ ) -> false end ),
916
+ Size = 256 ,
917
+ run_proper (
918
+ fun () ->
919
+ ? FORALL ({Length , Bytes , DeliveryLimit , SingleActive },
920
+ frequency ([{5 , {undefined , undefined , undefined , false }},
921
+ {5 , {oneof ([range (1 , 10 ), undefined ]),
922
+ oneof ([range (1 , 1000 ), undefined ]),
923
+ oneof ([range (1 , 3 ), undefined ]),
924
+ oneof ([true , false ])
925
+ }}]),
926
+ begin
927
+ Config = config (? FUNCTION_NAME ,
928
+ Length ,
929
+ Bytes ,
930
+ SingleActive ,
931
+ DeliveryLimit ),
932
+ ? FORALL (O , ? LET (Ops , log_gen (Size ), expand (Ops , Config )),
933
+ collect ({log_size , length (O )},
934
+ ra_indexes_prop (Config , O )))
935
+ end )
936
+ end , [], Size ).
937
+
913
938
simple_prefetch (_Config ) ->
914
939
Size = 500 ,
915
940
meck :expect (rabbit_feature_flags , is_enabled , fun (_ ) -> true end ),
@@ -1464,6 +1489,38 @@ messages_total_invariant() ->
1464
1489
end
1465
1490
end .
1466
1491
1492
+ ra_indexes_prop (Conf0 , Commands ) ->
1493
+ Conf = Conf0 #{release_cursor_interval => 100 },
1494
+ Indexes = lists :seq (1 , length (Commands )),
1495
+ Entries = lists :zip (Indexes , Commands ),
1496
+ InitState = test_init (Conf ),
1497
+ run_log (InitState , Entries , ra_indexes_invariant ()),
1498
+ true .
1499
+
1500
+ ra_indexes_invariant () ->
1501
+ % % The raft indexes contained in the `ra_indexes` `rabbit_fifo_index` must
1502
+ % % be the same as all indexes checked out by consumers plus those in the
1503
+ % % `returns` queue.
1504
+ fun (# rabbit_fifo {ra_indexes = Index ,
1505
+ consumers = C ,
1506
+ returns = R }) ->
1507
+ RIdxs = lqueue :fold (fun (? MSG (I , _ ), Acc ) -> [I | Acc ] end , [], R ),
1508
+ CIdxs = maps :fold (fun (_ , # consumer {checked_out = Ch }, Acc0 ) ->
1509
+ maps :fold (fun (_ , ? MSG (I , _ ), Acc ) ->
1510
+ [I | Acc ]
1511
+ end , Acc0 , Ch )
1512
+ end , [], C ),
1513
+ ActualIdxs = lists :sort (RIdxs ++ CIdxs ),
1514
+ IndexIdxs = lists :sort (rabbit_fifo_index :to_list (Index )),
1515
+ case ActualIdxs == IndexIdxs of
1516
+ true -> true ;
1517
+ false ->
1518
+ ct :pal (" ra_indexes invariant failed Expected ~b Got ~b " ,
1519
+ [ActualIdxs , IndexIdxs ]),
1520
+ false
1521
+ end
1522
+ end .
1523
+
1467
1524
simple_prefetch_prop (Conf0 , Commands , WithCheckoutCancel ) ->
1468
1525
Conf = Conf0 #{release_cursor_interval => 100 },
1469
1526
Indexes = lists :seq (1 , length (Commands )),
0 commit comments