@@ -58,6 +58,19 @@ const TString UPDATE_GROUP = R"sql(
58
58
AND consumer_group = $ConsumerGroup;
59
59
)sql" ;
60
60
61
+ const TString UPDATE_GROUP_STATE = R"sql(
62
+ --!syntax_v1
63
+ DECLARE $ConsumerGroup AS Utf8;
64
+ DECLARE $Database AS Utf8;
65
+ DECLARE $State AS Uint64;
66
+
67
+ UPDATE `%s`
68
+ SET
69
+ state = $State
70
+ WHERE database = $Database
71
+ AND consumer_group = $ConsumerGroup;
72
+ )sql" ;
73
+
61
74
const TString UPDATE_GROUP_STATE_AND_PROTOCOL = R"sql(
62
75
--!syntax_v1
63
76
DECLARE $ConsumerGroup AS Utf8;
@@ -184,16 +197,16 @@ const TString CHECK_DEAD_MEMBERS = R"sql(
184
197
DECLARE $ConsumerGroup AS Utf8;
185
198
DECLARE $Generation AS Uint64;
186
199
DECLARE $Database AS Utf8;
187
- DECLARE $Deadline AS Datetime;
188
200
DECLARE $MemberId AS Utf8;
189
201
190
- SELECT COUNT(1) as cnt
202
+ SELECT heartbeat_deadline
191
203
FROM `%s`
192
204
VIEW idx_group_generation_db_hb
193
205
WHERE database = $Database
194
- AND consumer_group = $ConsumerGroup
195
- AND generation = $Generation
196
- AND heartbeat_deadline < $Deadline;
206
+ AND consumer_group = $ConsumerGroup
207
+ AND generation = $Generation
208
+ ORDER BY heartbeat_deadline
209
+ LIMIT 1;
197
210
198
211
SELECT session_timeout_ms
199
212
FROM `%s`
@@ -246,8 +259,12 @@ const TString UPDATE_LASTHEARTBEAT_TO_LEAVE_GROUP = R"sql(
246
259
247
260
const TString CHECK_GROUPS_COUNT = R"sql(
248
261
--!syntax_v1
262
+ DECLARE $GroupsCountCheckDeadline AS Datetime;
263
+
249
264
SELECT COUNT(1) as groups_count
250
265
FROM `%s`
266
+ VIEW idx_last_hb
267
+ WHERE last_heartbeat_time > $GroupsCountCheckDeadline;
251
268
)sql" ;
252
269
253
270
} // namespace NKafka
0 commit comments