Skip to content

Commit 77ab3d7

Browse files
committed
work
1 parent b03fd8b commit 77ab3d7

19 files changed

+1715
-527
lines changed

ydb/core/kafka_proxy/actors/actors.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ NActors::IActor* CreateKafkaInitProducerIdActor(const TContext::TPtr context, co
161161
NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TMetadataRequestData>& message);
162162
NActors::IActor* CreateKafkaProduceActor(const TContext::TPtr context);
163163
NActors::IActor* CreateKafkaReadSessionActor(const TContext::TPtr context, ui64 cookie);
164+
NActors::IActor* CreateKafkaBalancerActor(const TContext::TPtr context, ui64 cookie);
164165
NActors::IActor* CreateKafkaSaslHandshakeActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TSaslHandshakeRequestData>& message);
165166
NActors::IActor* CreateKafkaSaslAuthActor(const TContext::TPtr context, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TMessagePtr<TSaslAuthenticateRequestData>& message);
166167
NActors::IActor* CreateKafkaListOffsetsActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TListOffsetsRequestData>& message);

ydb/core/kafka_proxy/actors/kafka_balance_actor_sql.cpp

Lines changed: 132 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -2,154 +2,220 @@
22

33
namespace NKafka {
44

5-
// savnik add db
6-
7-
const TString SELECT_STATE_AND_GENERATION = R"(
8-
--!syntax_v1
9-
DECLARE $ConsumerGroup AS Utf8;
10-
11-
SELECT state, generation
12-
FROM kafka_connect_groups
13-
WHERE consumer_group = $ConsumerGroup;
14-
)";
15-
165
const TString INSERT_NEW_GROUP = R"(
176
--!syntax_v1
187
DECLARE $ConsumerGroup AS Utf8;
198
DECLARE $Generation AS Uint64;
209
DECLARE $State AS Uint64;
10+
DECLARE $Database AS Utf8;
11+
DECLARE $Master AS Utf8;
2112
22-
INSERT INTO kafka_connect_groups
13+
INSERT INTO `/Root/.metadata/kafka_consumer_groups`
2314
(
2415
consumer_group,
2516
generation,
2617
state,
27-
current_generation_start_time
18+
database,
19+
last_heartbeat_time,
20+
master
2821
)
2922
VALUES
3023
(
3124
$ConsumerGroup,
3225
$Generation,
3326
$State,
34-
CurrentUtcDateTime()
27+
$Database,
28+
CurrentUtcDateTime(),
29+
$Master
3530
);
3631
)";
3732

3833
const TString UPDATE_GROUP = R"(
3934
--!syntax_v1
4035
DECLARE $ConsumerGroup AS Utf8;
41-
DECLARE $NewState AS Uint64;
42-
DECLARE $OldGeneration AS Uint64;
36+
DECLARE $State AS Uint64;
37+
DECLARE $Generation AS Uint64;
38+
DECLARE $Database AS Utf8;
39+
DECLARE $Master AS Utf8;
4340
44-
UPDATE kafka_connect_groups
41+
UPDATE `/Root/.metadata/kafka_consumer_groups`
4542
SET
46-
state = $NewState,
47-
generation = $OldGeneration + 1
48-
WHERE consumer_group = $ConsumerGroup;
43+
state = $State,
44+
generation = $Generation,
45+
last_heartbeat_time = CurrentUtcDateTime(),
46+
master = $Master
47+
WHERE consumer_group = $ConsumerGroup
48+
AND database = $Database;
4949
)";
5050

51-
const TString SELECT_MASTER = R"(
51+
const TString UPDATE_GROUP_STATE_AND_PROTOCOL = R"(
5252
--!syntax_v1
5353
DECLARE $ConsumerGroup AS Utf8;
54-
DECLARE $Generation AS Uint64;
54+
DECLARE $State AS Uint64;
55+
DECLARE $Database AS Utf8;
56+
DECLARE $Protocol AS Utf8;
5557
56-
SELECT member_id
57-
FROM kafka_connect_members
58+
UPDATE `/Root/.metadata/kafka_consumer_groups`
59+
SET
60+
state = $State,
61+
last_heartbeat_time = CurrentUtcDateTime(),
62+
protocol = $Protocol
5863
WHERE consumer_group = $ConsumerGroup
59-
AND generation = $Generation
60-
ORDER BY join_time
61-
LIMIT 1;
64+
AND database = $Database;
6265
)";
6366

64-
const TString INSERT_MEMBER_AND_SELECT_MASTER = R"(
67+
const TString INSERT_MEMBER = R"(
6568
--!syntax_v1
66-
DECLARE $ConsumerGroup AS Utf8;
67-
DECLARE $Generation AS Uint64;
68-
DECLARE $MemberId AS Utf8;
69-
DECLARE $WorkerState AS String;
69+
DECLARE $ConsumerGroup AS Utf8;
70+
DECLARE $Generation AS Uint64;
71+
DECLARE $MemberId AS Utf8;
72+
DECLARE $WorkerStateProto AS String;
73+
DECLARE $Database AS Utf8;
7074
71-
INSERT INTO kafka_connect_members (
75+
INSERT INTO `/Root/.metadata/kafka_consumer_members` (
7276
consumer_group,
7377
generation,
7478
member_id,
75-
join_time,
76-
hearbeat_deadline,
77-
worker_state
79+
last_heartbeat_time,
80+
worker_state_proto,
81+
database
7882
)
7983
VALUES (
8084
$ConsumerGroup,
8185
$Generation,
8286
$MemberId,
8387
CurrentUtcDateTime(),
84-
CurrentUtcDateTime() + Interval("PT5S"),
85-
$WorkerState
88+
$WorkerStateProto,
89+
$Database
8690
);
87-
88-
SELECT member_id AS master_id
89-
FROM kafka_connect_members
90-
WHERE consumer_group = $ConsumerGroup
91-
AND generation = $Generation
92-
ORDER BY join_time
93-
LIMIT 1;
9491
)";
9592

96-
97-
// savnik Леша говорил про пагинацию
98-
9993
const TString UPSERT_ASSIGNMENTS_AND_SET_WORKING_STATE = R"(
10094
--!syntax_v1
10195
DECLARE $Assignments AS List<Struct<MemberId: Utf8, Assignment: Bytes>>;
10296
DECLARE $ConsumerGroup AS Utf8;
97+
DECLARE $Database AS Utf8;
98+
DECLARE $Generation AS Uint64;
99+
DECLARE $State AS Uint64;
103100
104-
UPSERT INTO kafka_connect_members
101+
UPSERT INTO `/Root/.metadata/kafka_consumer_members`
105102
SELECT
106103
item.MemberId AS member_id,
107104
item.Assignment AS assignment,
108-
$ConsumerGroup AS consumer_group
105+
$ConsumerGroup AS consumer_group,
106+
$Database AS database,
107+
$Generation AS generation
109108
FROM AS_TABLE($Assignments) AS item;
110109
111-
UPDATE kafka_connect_groups
112-
SET state = 2
113-
WHERE consumer_group = $ConsumerGroup;
114-
110+
UPDATE `/Root/.metadata/kafka_consumer_groups`
111+
SET
112+
state = $State,
113+
last_heartbeat_time = CurrentUtcDateTime()
114+
WHERE consumer_group = $ConsumerGroup
115+
AND database = $Database;
115116
)";
116117

117118
const TString UPDATE_GROUPS_AND_SELECT_WORKER_STATES = R"(
118119
--!syntax_v1
119120
DECLARE $ConsumerGroup AS Utf8;
120121
DECLARE $State AS Uint64;
121122
DECLARE $Generation AS Uint64;
123+
DECLARE $Database AS Utf8;
122124
123-
UPDATE kafka_connect_groups
124-
SET state = $State
125-
WHERE consumer_group = $ConsumerGroup;
125+
UPDATE `/Root/.metadata/kafka_consumer_groups`
126+
SET
127+
state = $State,
128+
last_heartbeat_time = CurrentUtcDateTime()
129+
WHERE consumer_group = $ConsumerGroup
130+
AND database = $Database;
126131
127-
SELECT worker_state
128-
FROM kafka_connect_members
132+
SELECT worker_state_proto, member_id
133+
FROM `/Root/.metadata/kafka_consumer_members`
129134
WHERE consumer_group = $ConsumerGroup
130-
AND generation = $Generation;
135+
AND generation = $Generation
136+
AND database = $Database;
131137
)";
132138

133139
const TString CHECK_GROUP_STATE = R"(
134140
--!syntax_v1
135141
DECLARE $ConsumerGroup AS Utf8;
142+
DECLARE $Database AS Utf8;
136143
137-
SELECT state, generation
138-
FROM kafka_connect_groups
139-
WHERE consumer_group = $ConsumerGroup;
144+
SELECT state, generation, master, last_heartbeat_time, consumer_group, database
145+
FROM `/Root/.metadata/kafka_consumer_groups`
146+
WHERE consumer_group = $ConsumerGroup
147+
AND database = $Database;
140148
)";
141149

142-
const TString FETCH_ASSIGNMENT = R"(
150+
const TString FETCH_ASSIGNMENTS = R"(
143151
--!syntax_v1
144152
DECLARE $ConsumerGroup AS Utf8;
145153
DECLARE $Generation AS Uint64;
146154
DECLARE $MemberId AS Utf8;
155+
DECLARE $Database AS Utf8;
147156
148157
SELECT assignment
149-
FROM kafka_connect_members
158+
FROM `/Root/.metadata/kafka_consumer_members`
159+
WHERE consumer_group = $ConsumerGroup
160+
AND generation = $Generation
161+
AND member_id = $MemberId
162+
AND database = $Database;
163+
)";
164+
165+
const TString CHECK_DEAD_MEMBERS = R"(
166+
--!syntax_v1
167+
DECLARE $ConsumerGroup AS Utf8;
168+
DECLARE $Generation AS Uint64;
169+
DECLARE $Database AS Utf8;
170+
DECLARE $Deadline AS Datetime;
171+
172+
SELECT COUNT(1) as cnt
173+
FROM `/Root/.metadata/kafka_consumer_members`
150174
WHERE consumer_group = $ConsumerGroup
151175
AND generation = $Generation
152-
AND member_id = $MemberId;
176+
AND database = $Database
177+
AND last_heartbeat_time < $Deadline;
153178
)";
154179

180+
const TString UPDATE_TTLS = R"(
181+
--!syntax_v1
182+
DECLARE $ConsumerGroup AS Utf8;
183+
DECLARE $Generation AS Uint64;
184+
DECLARE $MemberId AS Utf8;
185+
DECLARE $Database AS Utf8;
186+
DECLARE $HeartbeatDeadline AS Datetime;
187+
DECLARE $UpdateGroupHeartbeat AS Bool;
188+
189+
UPDATE `/Root/.metadata/kafka_consumer_groups`
190+
SET last_heartbeat_time = CurrentUtcDateTime()
191+
WHERE consumer_group = $ConsumerGroup
192+
AND database = $Database
193+
AND $UpdateGroupHeartbeat = True;
194+
195+
UPDATE `/Root/.metadata/kafka_consumer_members`
196+
SET last_heartbeat_time = $HeartbeatDeadline
197+
WHERE consumer_group = $ConsumerGroup
198+
AND generation = $Generation
199+
AND member_id = $MemberId
200+
AND database = $Database;
201+
)";
202+
203+
204+
const TString UPDATE_TTL_LEAVE_GROUP = R"(
205+
--!syntax_v1
206+
DECLARE $ConsumerGroup AS Utf8;
207+
DECLARE $MemberId AS Utf8;
208+
DECLARE $Database AS Utf8;
209+
210+
UPDATE `/Root/.metadata/kafka_consumer_members`
211+
SET last_heartbeat_time = CurrentUtcDateTime() - Interval("PT1H")
212+
WHERE consumer_group = $ConsumerGroup
213+
AND member_id = $MemberId
214+
AND database = $Database;
215+
)";
216+
217+
155218
} // namespace NKafka
219+
220+
221+
// savnik check max members count

0 commit comments

Comments
 (0)