10
10
-include_lib (" common_test/include/ct.hrl" ).
11
11
-include_lib (" eunit/include/eunit.hrl" ).
12
12
-include_lib (" amqp_client/include/amqp_client.hrl" ).
13
- -compile ([nowarn_export_all , export_all ]).
13
+ -include_lib (" rabbitmq_ct_helpers/include/rabbit_assert.hrl" ).
14
+
15
+ -export ([suite /0 ,
16
+ all /0 ,
17
+ groups /0 ,
18
+ init_per_suite /1 ,
19
+ end_per_suite /1 ,
20
+ init_per_group /2 ,
21
+ end_per_group /2 ,
22
+ init_per_testcase /2 ,
23
+ end_per_testcase /2 ,
24
+
25
+ join_khepri_khepri_cluster /1 ,
26
+ join_mnesia_khepri_cluster /1 ,
27
+ join_mnesia_khepri_cluster_reverse /1 ,
28
+ join_khepri_mnesia_cluster /1 ,
29
+ join_khepri_mnesia_cluster_reverse /1 ,
30
+
31
+ join_khepri_khepri_khepri_cluster /1 ,
32
+ join_mnesia_khepri_khepri_cluster /1 ,
33
+ join_mnesia_khepri_khepri_cluster_reverse /1 ,
34
+ join_khepri_mnesia_khepri_cluster /1 ,
35
+ join_khepri_mnesia_khepri_cluster_reverse /1 ,
36
+ join_khepri_khepri_mnesia_cluster /1 ,
37
+ join_khepri_khepri_mnesia_cluster_reverse /1 ,
38
+ join_mnesia_mnesia_khepri_cluster /1 ,
39
+ join_mnesia_mnesia_khepri_cluster_reverse /1 ,
40
+ join_mnesia_khepri_mnesia_cluster /1 ,
41
+ join_mnesia_khepri_mnesia_cluster_reverse /1 ,
42
+ join_khepri_mnesia_mnesia_cluster /1 ,
43
+ join_khepri_mnesia_mnesia_cluster_reverse /1 ,
44
+
45
+ join_khepri_while_in_minority /1
46
+ ]).
14
47
15
48
suite () ->
16
49
[{timetrap , 5 * 60_000 }].
@@ -23,7 +56,8 @@ all() ->
23
56
groups () ->
24
57
[
25
58
{unclustered , [], [{cluster_size_2 , [], cluster_size_2_tests ()},
26
- {cluster_size_3 , [], cluster_size_3_tests ()}]}
59
+ {cluster_size_3 , [], cluster_size_3_tests ()},
60
+ {cluster_size_5 , [], cluster_size_5_tests ()}]}
27
61
].
28
62
29
63
cluster_size_2_tests () ->
@@ -52,6 +86,11 @@ cluster_size_3_tests() ->
52
86
join_khepri_mnesia_mnesia_cluster_reverse
53
87
].
54
88
89
+ cluster_size_5_tests () ->
90
+ [
91
+ join_khepri_while_in_minority
92
+ ].
93
+
55
94
% % -------------------------------------------------------------------
56
95
% % Testsuite setup/teardown.
57
96
% % -------------------------------------------------------------------
@@ -78,7 +117,9 @@ init_per_group(unclustered, Config) ->
78
117
init_per_group (cluster_size_2 , Config ) ->
79
118
rabbit_ct_helpers :set_config (Config , [{rmq_nodes_count , 2 }]);
80
119
init_per_group (cluster_size_3 , Config ) ->
81
- rabbit_ct_helpers :set_config (Config , [{rmq_nodes_count , 3 }]).
120
+ rabbit_ct_helpers :set_config (Config , [{rmq_nodes_count , 3 }]);
121
+ init_per_group (cluster_size_5 , Config ) ->
122
+ rabbit_ct_helpers :set_config (Config , [{rmq_nodes_count , 5 }]).
82
123
83
124
end_per_group (_ , Config ) ->
84
125
Config .
@@ -343,3 +384,121 @@ declare(Ch, Q) ->
343
384
durable = true ,
344
385
auto_delete = false ,
345
386
arguments = []}).
387
+
388
+ join_khepri_while_in_minority (Config ) ->
389
+ [Node1 | ClusteredNodes ] = rabbit_ct_broker_helpers :get_node_configs (
390
+ Config , nodename ),
391
+ [NodeToJoin | OtherNodes ] = ClusteredNodes ,
392
+
393
+ % % Cluster nodes 2 to 5.
394
+ ct :pal (" Cluster nodes ~p " , [ClusteredNodes ]),
395
+ lists :foreach (
396
+ fun (Node ) ->
397
+ ? assertEqual (
398
+ ok ,
399
+ rabbit_control_helper :command (
400
+ join_cluster , Node , [atom_to_list (NodeToJoin )], []))
401
+ end , OtherNodes ),
402
+ lists :foreach (
403
+ fun (Node ) ->
404
+ ? awaitMatch (
405
+ ClusteredNodes ,
406
+ lists :sort (
407
+ rabbit_ct_broker_helpers :rpc (
408
+ Config , Node , rabbit_nodes , list_members , [])),
409
+ 30000 )
410
+ end , ClusteredNodes ),
411
+
412
+ % % Enable Khepri on all nodes. Only `Node2' is given here because it is
413
+ % % clustered with `OtherNodes'.
414
+ ct :pal (" Enable `khepri_db` on nodes ~0p and ~0p " , [Node1 , NodeToJoin ]),
415
+ Ret1 = rabbit_ct_broker_helpers :enable_feature_flag (
416
+ Config , [Node1 , NodeToJoin ], khepri_db ),
417
+ case Ret1 of
418
+ ok ->
419
+ StoreId = rabbit_khepri :get_store_id (),
420
+ LeaderId = rabbit_ct_broker_helpers :rpc (
421
+ Config , NodeToJoin ,
422
+ ra_leaderboard , lookup_leader , [StoreId ]),
423
+ {StoreId , LeaderNode } = LeaderId ,
424
+
425
+ % % Stop all clustered nodes except one follower to create a
426
+ % % minority. In other words, we stop two followers, then the
427
+ % % leader.
428
+ % %
429
+ % % Using `lists:reverse/1', we keep the last running followe only
430
+ % % to see how clustering works if the first nodes in the cluster
431
+ % % are down.
432
+ Followers = ClusteredNodes -- [LeaderNode ],
433
+ [FollowerToKeep | FollowersToStop ] = lists :reverse (Followers ),
434
+
435
+ lists :foreach (
436
+ fun (Node ) ->
437
+ ct :pal (" Stop node ~0p " , [Node ]),
438
+ ok = rabbit_ct_broker_helpers :stop_node (Config , Node )
439
+ end , FollowersToStop ++ [LeaderNode ]),
440
+
441
+ % % Try and fail to cluster `Node1' with the others.
442
+ ct :pal (" Try to cluster node ~0p with ~0p " , [Node1 , FollowerToKeep ]),
443
+ Ret2 = rabbit_control_helper :command (
444
+ join_cluster , Node1 , [atom_to_list (FollowerToKeep )], []),
445
+ ? assertMatch ({error , 75 , _ }, Ret2 ),
446
+ {error , _ , Msg } = Ret2 ,
447
+ ? assertEqual (
448
+ match ,
449
+ re :run (
450
+ Msg , " Khepri cluster could be in minority" ,
451
+ [{capture , none }])),
452
+
453
+ % % `Node1' should still be up and running correctly.
454
+ ct :pal (" Open a connection + channel to node ~0p " , [Node1 ]),
455
+ {Conn , Ch } = rabbit_ct_client_helpers :open_connection_and_channel (
456
+ Config , Node1 ),
457
+
458
+ QName = atom_to_binary (? FUNCTION_NAME ),
459
+ QArgs = [{<<" x-queue-type" >>, longstr , <<" quorum" >>}],
460
+ ct :pal (" Declare queue ~0p " , [QName ]),
461
+ amqp_channel :call (
462
+ Ch , # 'queue.declare' {durable = true ,
463
+ queue = QName ,
464
+ arguments = QArgs }),
465
+
466
+ ct :pal (" Enable publish confirms" ),
467
+ amqp_channel :call (Ch , # 'confirm.select' {}),
468
+
469
+ ct :pal (" Publish a message to queue ~0p " , [QName ]),
470
+ amqp_channel :cast (
471
+ Ch ,
472
+ # 'basic.publish' {routing_key = QName },
473
+ # amqp_msg {props = # 'P_basic' {delivery_mode = 2 }}),
474
+ amqp_channel :wait_for_confirms (Ch ),
475
+
476
+ ct :pal (" Subscribe to queue ~0p " , [QName ]),
477
+ CTag = <<" ctag" >>,
478
+ amqp_channel :subscribe (
479
+ Ch ,
480
+ # 'basic.consume' {queue = QName ,
481
+ consumer_tag = CTag },
482
+ self ()),
483
+ receive
484
+ # 'basic.consume_ok' {consumer_tag = CTag } ->
485
+ ok
486
+ after 10000 ->
487
+ exit (consume_ok_timeout )
488
+ end ,
489
+
490
+ ct :pal (" Consume a message from queue ~0p " , [QName ]),
491
+ receive
492
+ {# 'basic.deliver' {consumer_tag = <<" ctag" >>}, _ } ->
493
+ ok
494
+ after 10000 ->
495
+ exit (deliver_timeout )
496
+ end ,
497
+
498
+ ct :pal (" Close channel + connection" ),
499
+ rabbit_ct_client_helpers :close_connection_and_channel (Conn , Ch ),
500
+
501
+ ok ;
502
+ {skip , _ } = Skip ->
503
+ Skip
504
+ end .
0 commit comments