@@ -58,12 +58,14 @@ const TString UPDATE_GROUP_STATE = R"sql(
58
58
DECLARE $ConsumerGroup AS Utf8;
59
59
DECLARE $Database AS Utf8;
60
60
DECLARE $State AS Uint64;
61
+ DECLARE $Generation AS Uint64;
61
62
62
63
UPDATE `%s`
63
64
SET
64
65
state = $State
65
66
WHERE database = $Database
66
- AND consumer_group = $ConsumerGroup;
67
+ AND consumer_group = $ConsumerGroup
68
+ AND generation = $Generation;
67
69
)sql" ;
68
70
69
71
const TString UPDATE_GROUP_STATE_AND_PROTOCOL = R"sql(
@@ -93,6 +95,7 @@ const TString INSERT_MEMBER = R"sql(
93
95
DECLARE $Database AS Utf8;
94
96
DECLARE $HeartbeatDeadline AS Datetime;
95
97
DECLARE $SessionTimeoutMs AS Uint32;
98
+ DECLARE $RebalanceTimeoutMs AS Uint32;
96
99
97
100
INSERT INTO `%s`
98
101
(
@@ -103,7 +106,8 @@ const TString INSERT_MEMBER = R"sql(
103
106
heartbeat_deadline,
104
107
worker_state_proto,
105
108
database,
106
- session_timeout_ms
109
+ session_timeout_ms,
110
+ rebalance_timeout_ms
107
111
)
108
112
VALUES (
109
113
$ConsumerGroup,
@@ -113,7 +117,8 @@ const TString INSERT_MEMBER = R"sql(
113
117
$HeartbeatDeadline,
114
118
$WorkerStateProto,
115
119
$Database,
116
- $SessionTimeoutMs
120
+ $SessionTimeoutMs,
121
+ $RebalanceTimeoutMs
117
122
);
118
123
)sql" ;
119
124
@@ -217,15 +222,15 @@ const TString CHECK_DEAD_MEMBERS = R"sql(
217
222
DECLARE $Generation AS Uint64;
218
223
DECLARE $Database AS Utf8;
219
224
DECLARE $MemberId AS Utf8;
225
+ DECLARE $Now AS Datetime;
220
226
221
- SELECT heartbeat_deadline
227
+ SELECT COUNT(1) deads_cnt
222
228
FROM `%s`
223
229
VIEW idx_group_generation_db_hb
224
230
WHERE database = $Database
225
231
AND consumer_group = $ConsumerGroup
226
232
AND generation = $Generation
227
- ORDER BY heartbeat_deadline
228
- LIMIT 1;
233
+ AND heartbeat_deadline < $Now;
229
234
230
235
SELECT session_timeout_ms
231
236
FROM `%s`
@@ -237,7 +242,7 @@ const TString CHECK_DEAD_MEMBERS = R"sql(
237
242
238
243
)sql" ;
239
244
240
- const TString UPDATE_LAST_HEARTBEATS = R"sql(
245
+ const TString UPDATE_LAST_MEMBER_AND_GROUP_HEARTBEATS = R"sql(
241
246
--!syntax_v1
242
247
DECLARE $ConsumerGroup AS Utf8;
243
248
DECLARE $Generation AS Uint64;
@@ -262,20 +267,80 @@ const TString UPDATE_LAST_HEARTBEATS = R"sql(
262
267
AND database = $Database;
263
268
)sql" ;
264
269
270
+ const TString UPDATE_LAST_MEMBER_HEARTBEAT = R"sql(
271
+ --!syntax_v1
272
+ DECLARE $ConsumerGroup AS Utf8;
273
+ DECLARE $Generation AS Uint64;
274
+ DECLARE $MemberId AS Utf8;
275
+ DECLARE $Database AS Utf8;
276
+ DECLARE $HeartbeatDeadline AS Datetime;
277
+
278
+ UPDATE `%s`
279
+ SET heartbeat_deadline = $HeartbeatDeadline
280
+ WHERE consumer_group = $ConsumerGroup
281
+ AND generation = $Generation
282
+ AND member_id = $MemberId
283
+ AND database = $Database;
284
+ )sql" ;
265
285
266
- const TString UPDATE_LASTHEARTBEAT_TO_LEAVE_GROUP = R"sql(
286
+ const TString CHECK_MASTER_ALIVE = R"sql(
287
+ --!syntax_v1
288
+ DECLARE $ConsumerGroup AS Utf8;
289
+ DECLARE $MasterId AS Utf8;
290
+ DECLARE $Database AS Utf8;
291
+ DECLARE $Generation AS Uint64;
292
+ DECLARE $Now AS Datetime;
293
+
294
+ SELECT COUNT(1) allive,
295
+ FROM `%s`
296
+ VIEW PRIMARY KEY
297
+ WHERE database = $Database
298
+ AND consumer_group = $ConsumerGroup
299
+ AND generation = $Generation
300
+ AND member_id = $MasterId
301
+ AND heartbeat_deadline > $Now;
302
+ )sql" ;
303
+
304
+ const TString GET_GENERATION_BY_MEMBER = R"sql(
267
305
--!syntax_v1
268
306
DECLARE $ConsumerGroup AS Utf8;
269
307
DECLARE $MemberId AS Utf8;
270
308
DECLARE $Database AS Utf8;
309
+
310
+ SELECT generation
311
+ FROM `%s`
312
+ VIEW PRIMARY KEY
313
+ WHERE database = $Database
314
+ AND consumer_group = $ConsumerGroup
315
+ AND member_id = $MemberId
316
+ ORDER BY generation DESC
317
+ LIMIT 1;
318
+ )sql" ;
319
+
320
+ const TString UPDATE_LAST_HEARTBEAT_AND_STATE_TO_LEAVE_GROUP = R"sql(
321
+ --!syntax_v1
322
+ DECLARE $ConsumerGroup AS Utf8;
323
+ DECLARE $MemberId AS Utf8;
324
+ DECLARE $Database AS Utf8;
325
+ DECLARE $Generation AS Uint64;
271
326
DECLARE $LastMasterHeartbeat AS Datetime;
327
+ DECLARE $State AS Uint64;
328
+ DECLARE $UpdateState AS Bool;
272
329
273
330
UPDATE `%s`
274
331
SET heartbeat_deadline = $LastMasterHeartbeat,
275
332
leaved = True
276
- WHERE consumer_group = $ConsumerGroup
277
- AND member_id = $MemberId
278
- AND database = $Database;
333
+ WHERE database = $Database
334
+ AND consumer_group = $ConsumerGroup
335
+ AND generation = $Generation
336
+ AND member_id = $MemberId;
337
+
338
+ UPDATE `%s`
339
+ SET
340
+ state = $State
341
+ WHERE database = $Database
342
+ AND consumer_group = $ConsumerGroup
343
+ AND $UpdateState = True;
279
344
)sql" ;
280
345
281
346
const TString CHECK_GROUPS_COUNT = R"sql(
0 commit comments