@@ -83,50 +83,78 @@ end_per_testcase(TestCase, Config) ->
83
83
rabbit_ct_helpers :testcase_finished (Config , TestCase ).
84
84
85
85
simple_sac_consumer_should_get_disconnected_on_partition (Config ) ->
86
- T = ? TRSPT ,
87
86
S = rabbit_data_coercion :to_binary (? FUNCTION_NAME ),
88
- {ok , S0 , C0 } = stream_test_utils :connect (Config , 0 ),
89
- {ok , S1 , C1 } = stream_test_utils :connect (Config , 1 ),
90
- {ok , S2 , C2 } = stream_test_utils :connect (Config , 2 ),
87
+ {ok , So0 , C0_00 } = stream_test_utils :connect (Config , 0 ),
88
+ {ok , So1 , C1_00 } = stream_test_utils :connect (Config , 1 ),
89
+ {ok , So2 , C2_00 } = stream_test_utils :connect (Config , 2 ),
91
90
92
91
create_stream (Config , S ),
93
- wait_for_members (S0 , C0 , S , 3 ),
92
+ wait_for_members (So0 , C0_00 , S , 3 ),
94
93
95
- C0_01 = register_sac (S0 , C0 , S ),
96
- C0_02 = receive_consumer_update (S0 , C0_01 ),
94
+ C0_01 = register_sac (So0 , C0_00 , S , 0 ),
95
+ C0_02 = receive_consumer_update (So0 , C0_01 ),
97
96
98
- C1_01 = register_sac (S1 , C1 , S ),
99
- C2_01 = register_sac (S2 , C2 , S ),
97
+ C1_01 = register_sac (So1 , C1_00 , S , 1 ),
98
+ C2_01 = register_sac (So2 , C2_00 , S , 2 ),
99
+ SubIdToState0 = #{0 => {So0 , C0_02 },
100
+ 1 => {So1 , C1_01 },
101
+ 2 => {So2 , C2_01 }},
100
102
101
103
Members = stream_members (Config , S ),
102
104
L = leader (Members ),
103
105
[F1 , F2 ] = followers (Members ),
104
106
105
- Consumers = query_consumers (Config , S ),
106
- assertSize (3 , Consumers ),
107
- assertConsumersConnected (Consumers ),
107
+ Consumers1 = query_consumers (Config , S ),
108
+ assertSize (3 , Consumers1 ),
109
+ assertConsumersConnected (Consumers1 ),
110
+
111
+ Isolated = F1 ,
112
+ {value , DisconnectedConsumer } =
113
+ lists :search (fun (# consumer {pid = ConnPid }) ->
114
+ rpc (Config , erlang , node , [ConnPid ]) =:= Isolated
115
+ end , Consumers1 ),
116
+ # consumer {subscription_id = DiscSubId } = DisconnectedConsumer ,
108
117
109
- rabbit_ct_broker_helpers :block_traffic_between (F1 , L ),
110
- rabbit_ct_broker_helpers :block_traffic_between (F1 , F2 ),
118
+ rabbit_ct_broker_helpers :block_traffic_between (Isolated , L ),
119
+ rabbit_ct_broker_helpers :block_traffic_between (Isolated , F2 ),
111
120
112
121
wait_for_disconnected_consumer (Config , S ),
113
122
wait_for_forgotten_consumer (Config , S ),
114
123
115
- rabbit_ct_broker_helpers :allow_traffic_between (F1 , L ),
116
- rabbit_ct_broker_helpers :allow_traffic_between (F1 , F2 ),
124
+ rabbit_ct_broker_helpers :allow_traffic_between (Isolated , L ),
125
+ rabbit_ct_broker_helpers :allow_traffic_between (Isolated , F2 ),
117
126
118
127
wait_for_all_consumers_connected (Config , S ),
119
- assertConsumersConnected (query_consumers (Config , S )),
128
+
129
+ Consumers2 = query_consumers (Config , S ),
130
+ % % the disconnected, then forgotten consumer is cancelled,
131
+ % % because the stream member on its node has been restarted
132
+ assertSize (2 , Consumers2 ),
133
+ assertConsumersConnected (Consumers2 ),
134
+ ? assertMatch ([DisconnectedConsumer ],
135
+ Consumers1 -- Consumers2 ),
136
+
137
+ % % assert the cancelled consumer received a metadata update frame
138
+ SubIdToState1 =
139
+ maps :fold (fun (K , {S0 , C0 }, Acc ) when K == DiscSubId ->
140
+ C1 = receive_metadata_update (S0 , C0 ),
141
+ Acc #{K => {S0 , C1 }};
142
+ (K , {S0 , C0 }, Acc ) ->
143
+ Acc #{K => {S0 , C0 }}
144
+ end , #{}, SubIdToState0 ),
120
145
121
146
delete_stream (Config , S ),
122
147
123
- {_ , _ } = receive_commands (T , S0 , C0 ),
124
- {_ , _ } = receive_commands (T , S1 , C1 ),
125
- {_ , _ } = receive_commands (T , S2 , C2 ),
148
+ % % online consumers should receive a metadata update frame (stream deleted)
149
+ % % we unqueue the this frame before closing the connection
150
+ % % directly closing the connection of the cancelled consumer
151
+ maps :foreach (fun (K , {S0 , C0 }) when K /= DiscSubId ->
152
+ {_ , C1 } = receive_commands (S0 , C0 ),
153
+ {ok , _ } = stream_test_utils :close (S0 , C1 );
154
+ (_ , {S0 , C0 }) ->
155
+ {ok , _ } = stream_test_utils :close (S0 , C0 )
156
+ end , SubIdToState1 ),
126
157
127
- {ok , _ } = stream_test_utils :close (S0 , C0 ),
128
- {ok , _ } = stream_test_utils :close (S1 , C1 ),
129
- {ok , _ } = stream_test_utils :close (S2 , C2 ),
130
158
ok .
131
159
132
160
leader (Members ) ->
@@ -161,8 +189,8 @@ delete_stream(Config, St) ->
161
189
{ok , C1 } = stream_test_utils :delete_stream (S , C0 , St ),
162
190
{ok , _ } = stream_test_utils :close (S , C1 ).
163
191
164
- register_sac (S , C0 , St ) ->
165
- SacSubscribeFrame = request ({subscribe , 0 , St ,
192
+ register_sac (S , C0 , St , SubId ) ->
193
+ SacSubscribeFrame = request ({subscribe , SubId , St ,
166
194
first , 1 ,
167
195
#{<<" single-active-consumer" >> => <<" true" >>,
168
196
<<" name" >> => name ()}}),
@@ -174,11 +202,17 @@ register_sac(S, C0, St) ->
174
202
C1 .
175
203
176
204
receive_consumer_update (S , C0 ) ->
177
- {Cmd , C1 } = receive_commands (? TRSPT , S , C0 ),
205
+ {Cmd , C1 } = receive_commands (S , C0 ),
178
206
? assertMatch ({request , _CorrId , {consumer_update , _SubId , _Status }},
179
207
Cmd ),
180
208
C1 .
181
209
210
+ receive_metadata_update (S , C0 ) ->
211
+ {Cmd , C1 } = receive_commands (S , C0 ),
212
+ ? assertMatch ({metadata_update , _ , ? RESPONSE_CODE_STREAM_NOT_AVAILABLE },
213
+ Cmd ),
214
+ C1 .
215
+
182
216
unsubscribe (S , C0 ) ->
183
217
{ok , C1 } = stream_test_utils :unsubscribe (S , C0 , sub_id ()),
184
218
C1 .
@@ -204,6 +238,9 @@ request(Cmd) ->
204
238
request (CorrId , Cmd ) ->
205
239
rabbit_stream_core :frame ({request , CorrId , Cmd }).
206
240
241
+ receive_commands (S , C ) ->
242
+ receive_commands (? TRSPT , S , C ).
243
+
207
244
receive_commands (Transport , S , C ) ->
208
245
stream_test_utils :receive_stream_commands (Transport , S , C ).
209
246
0 commit comments