2
2
3
3
namespace NKafka {
4
4
5
- // savnik add db
6
-
7
- const TString SELECT_STATE_AND_GENERATION = R"(
8
- --!syntax_v1
9
- DECLARE $ConsumerGroup AS Utf8;
10
-
11
- SELECT state, generation
12
- FROM kafka_connect_groups
13
- WHERE consumer_group = $ConsumerGroup;
14
- )" ;
15
-
16
5
const TString INSERT_NEW_GROUP = R"(
17
6
--!syntax_v1
18
7
DECLARE $ConsumerGroup AS Utf8;
19
8
DECLARE $Generation AS Uint64;
20
9
DECLARE $State AS Uint64;
10
+ DECLARE $Database AS Utf8;
11
+ DECLARE $Master AS Utf8;
21
12
22
- INSERT INTO kafka_connect_groups
13
+ INSERT INTO `/Root/.metadata/kafka_consumer_groups`
23
14
(
24
15
consumer_group,
25
16
generation,
26
17
state,
27
- current_generation_start_time
18
+ database,
19
+ last_heartbeat_time,
20
+ master
28
21
)
29
22
VALUES
30
23
(
31
24
$ConsumerGroup,
32
25
$Generation,
33
26
$State,
34
- CurrentUtcDateTime()
27
+ $Database,
28
+ CurrentUtcDateTime(),
29
+ $Master
35
30
);
36
31
)" ;
37
32
38
33
const TString UPDATE_GROUP = R"(
39
34
--!syntax_v1
40
35
DECLARE $ConsumerGroup AS Utf8;
41
- DECLARE $NewState AS Uint64;
42
- DECLARE $OldGeneration AS Uint64;
36
+ DECLARE $State AS Uint64;
37
+ DECLARE $Generation AS Uint64;
38
+ DECLARE $Database AS Utf8;
39
+ DECLARE $Master AS Utf8;
43
40
44
- UPDATE kafka_connect_groups
41
+ UPDATE `/Root/.metadata/kafka_consumer_groups`
45
42
SET
46
- state = $NewState,
47
- generation = $OldGeneration + 1
48
- WHERE consumer_group = $ConsumerGroup;
43
+ state = $State,
44
+ generation = $Generation,
45
+ last_heartbeat_time = CurrentUtcDateTime(),
46
+ master = $Master
47
+ WHERE consumer_group = $ConsumerGroup
48
+ AND database = $Database;
49
49
)" ;
50
50
51
- const TString SELECT_MASTER = R"(
51
+ const TString UPDATE_GROUP_STATE_AND_PROTOCOL = R"(
52
52
--!syntax_v1
53
53
DECLARE $ConsumerGroup AS Utf8;
54
- DECLARE $Generation AS Uint64;
54
+ DECLARE $State AS Uint64;
55
+ DECLARE $Database AS Utf8;
56
+ DECLARE $Protocol AS Utf8;
55
57
56
- SELECT member_id
57
- FROM kafka_connect_members
58
+ UPDATE `/Root/.metadata/kafka_consumer_groups`
59
+ SET
60
+ state = $State,
61
+ last_heartbeat_time = CurrentUtcDateTime(),
62
+ protocol = $Protocol
58
63
WHERE consumer_group = $ConsumerGroup
59
- AND generation = $Generation
60
- ORDER BY join_time
61
- LIMIT 1;
64
+ AND database = $Database;
62
65
)" ;
63
66
64
- const TString INSERT_MEMBER_AND_SELECT_MASTER = R"(
67
+ const TString INSERT_MEMBER = R"(
65
68
--!syntax_v1
66
- DECLARE $ConsumerGroup AS Utf8;
67
- DECLARE $Generation AS Uint64;
68
- DECLARE $MemberId AS Utf8;
69
- DECLARE $WorkerState AS String;
69
+ DECLARE $ConsumerGroup AS Utf8;
70
+ DECLARE $Generation AS Uint64;
71
+ DECLARE $MemberId AS Utf8;
72
+ DECLARE $WorkerStateProto AS String;
73
+ DECLARE $Database AS Utf8;
70
74
71
- INSERT INTO kafka_connect_members (
75
+ INSERT INTO `/Root/.metadata/kafka_consumer_members` (
72
76
consumer_group,
73
77
generation,
74
78
member_id,
75
- join_time ,
76
- hearbeat_deadline ,
77
- worker_state
79
+ last_heartbeat_time ,
80
+ worker_state_proto ,
81
+ database
78
82
)
79
83
VALUES (
80
84
$ConsumerGroup,
81
85
$Generation,
82
86
$MemberId,
83
87
CurrentUtcDateTime(),
84
- CurrentUtcDateTime() + Interval("PT5S") ,
85
- $WorkerState
88
+ $WorkerStateProto ,
89
+ $Database
86
90
);
87
-
88
- SELECT member_id AS master_id
89
- FROM kafka_connect_members
90
- WHERE consumer_group = $ConsumerGroup
91
- AND generation = $Generation
92
- ORDER BY join_time
93
- LIMIT 1;
94
91
)" ;
95
92
96
-
97
- // savnik Леша говорил про пагинацию
98
-
99
93
const TString UPSERT_ASSIGNMENTS_AND_SET_WORKING_STATE = R"(
100
94
--!syntax_v1
101
95
DECLARE $Assignments AS List<Struct<MemberId: Utf8, Assignment: Bytes>>;
102
96
DECLARE $ConsumerGroup AS Utf8;
97
+ DECLARE $Database AS Utf8;
98
+ DECLARE $Generation AS Uint64;
99
+ DECLARE $State AS Uint64;
103
100
104
- UPSERT INTO kafka_connect_members
101
+ UPSERT INTO `/Root/.metadata/kafka_consumer_members`
105
102
SELECT
106
103
item.MemberId AS member_id,
107
104
item.Assignment AS assignment,
108
- $ConsumerGroup AS consumer_group
105
+ $ConsumerGroup AS consumer_group,
106
+ $Database AS database,
107
+ $Generation AS generation
109
108
FROM AS_TABLE($Assignments) AS item;
110
109
111
- UPDATE kafka_connect_groups
112
- SET state = 2
113
- WHERE consumer_group = $ConsumerGroup;
114
-
110
+ UPDATE `/Root/.metadata/kafka_consumer_groups`
111
+ SET
112
+ state = $State,
113
+ last_heartbeat_time = CurrentUtcDateTime()
114
+ WHERE consumer_group = $ConsumerGroup
115
+ AND database = $Database;
115
116
)" ;
116
117
117
118
const TString UPDATE_GROUPS_AND_SELECT_WORKER_STATES = R"(
118
119
--!syntax_v1
119
120
DECLARE $ConsumerGroup AS Utf8;
120
121
DECLARE $State AS Uint64;
121
122
DECLARE $Generation AS Uint64;
123
+ DECLARE $Database AS Utf8;
122
124
123
- UPDATE kafka_connect_groups
124
- SET state = $State
125
- WHERE consumer_group = $ConsumerGroup;
125
+ UPDATE `/Root/.metadata/kafka_consumer_groups`
126
+ SET
127
+ state = $State,
128
+ last_heartbeat_time = CurrentUtcDateTime()
129
+ WHERE consumer_group = $ConsumerGroup
130
+ AND database = $Database;
126
131
127
- SELECT worker_state
128
- FROM kafka_connect_members
132
+ SELECT worker_state_proto, member_id
133
+ FROM `/Root/.metadata/kafka_consumer_members`
129
134
WHERE consumer_group = $ConsumerGroup
130
- AND generation = $Generation;
135
+ AND generation = $Generation
136
+ AND database = $Database;
131
137
)" ;
132
138
133
139
const TString CHECK_GROUP_STATE = R"(
134
140
--!syntax_v1
135
141
DECLARE $ConsumerGroup AS Utf8;
142
+ DECLARE $Database AS Utf8;
136
143
137
- SELECT state, generation
138
- FROM kafka_connect_groups
139
- WHERE consumer_group = $ConsumerGroup;
144
+ SELECT state, generation, master, last_heartbeat_time, consumer_group, database
145
+ FROM `/Root/.metadata/kafka_consumer_groups`
146
+ WHERE consumer_group = $ConsumerGroup
147
+ AND database = $Database;
140
148
)" ;
141
149
142
- const TString FETCH_ASSIGNMENT = R"(
150
+ const TString FETCH_ASSIGNMENTS = R"(
143
151
--!syntax_v1
144
152
DECLARE $ConsumerGroup AS Utf8;
145
153
DECLARE $Generation AS Uint64;
146
154
DECLARE $MemberId AS Utf8;
155
+ DECLARE $Database AS Utf8;
147
156
148
157
SELECT assignment
149
- FROM kafka_connect_members
158
+ FROM `/Root/.metadata/kafka_consumer_members`
159
+ WHERE consumer_group = $ConsumerGroup
160
+ AND generation = $Generation
161
+ AND member_id = $MemberId
162
+ AND database = $Database;
163
+ )" ;
164
+
165
+ const TString CHECK_DEAD_MEMBERS = R"(
166
+ --!syntax_v1
167
+ DECLARE $ConsumerGroup AS Utf8;
168
+ DECLARE $Generation AS Uint64;
169
+ DECLARE $Database AS Utf8;
170
+ DECLARE $Deadline AS Datetime;
171
+
172
+ SELECT COUNT(1) as cnt
173
+ FROM `/Root/.metadata/kafka_consumer_members`
150
174
WHERE consumer_group = $ConsumerGroup
151
175
AND generation = $Generation
152
- AND member_id = $MemberId;
176
+ AND database = $Database
177
+ AND last_heartbeat_time < $Deadline;
153
178
)" ;
154
179
180
+ const TString UPDATE_TTLS = R"(
181
+ --!syntax_v1
182
+ DECLARE $ConsumerGroup AS Utf8;
183
+ DECLARE $Generation AS Uint64;
184
+ DECLARE $MemberId AS Utf8;
185
+ DECLARE $Database AS Utf8;
186
+ DECLARE $HeartbeatDeadline AS Datetime;
187
+ DECLARE $UpdateGroupHeartbeat AS Bool;
188
+
189
+ UPDATE `/Root/.metadata/kafka_consumer_groups`
190
+ SET last_heartbeat_time = CurrentUtcDateTime()
191
+ WHERE consumer_group = $ConsumerGroup
192
+ AND database = $Database
193
+ AND $UpdateGroupHeartbeat = True;
194
+
195
+ UPDATE `/Root/.metadata/kafka_consumer_members`
196
+ SET last_heartbeat_time = $HeartbeatDeadline
197
+ WHERE consumer_group = $ConsumerGroup
198
+ AND generation = $Generation
199
+ AND member_id = $MemberId
200
+ AND database = $Database;
201
+ )" ;
202
+
203
+
204
+ const TString UPDATE_TTL_LEAVE_GROUP = R"(
205
+ --!syntax_v1
206
+ DECLARE $ConsumerGroup AS Utf8;
207
+ DECLARE $MemberId AS Utf8;
208
+ DECLARE $Database AS Utf8;
209
+
210
+ UPDATE `/Root/.metadata/kafka_consumer_members`
211
+ SET last_heartbeat_time = CurrentUtcDateTime() - Interval("PT1H")
212
+ WHERE consumer_group = $ConsumerGroup
213
+ AND member_id = $MemberId
214
+ AND database = $Database;
215
+ )" ;
216
+
217
+
155
218
} // namespace NKafka
219
+
220
+
221
+ // savnik check max members count
0 commit comments