Skip to content

Commit 7a3575f

Browse files
[Kafka API] Add kafka transaction actor (part of kafka transactions epic) (#17141)
1 parent 9652ef7 commit 7a3575f

17 files changed

+1454
-117
lines changed

ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -284,8 +284,10 @@ namespace NKafka {
284284

285285
Send(NKafka::MakeKafkaTransactionsServiceID(), new TEvKafka::TEvSaveTxnProducerRequest(
286286
producerState.TransactionalId,
287-
producerState.ProducerId,
288-
producerState.ProducerEpoch
287+
{
288+
producerState.ProducerId,
289+
producerState.ProducerEpoch
290+
}
289291
));
290292
}
291293

ydb/core/kafka_proxy/actors/kafka_transaction_actor.cpp

Lines changed: 434 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 122 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,144 @@
11
#pragma once
22

3+
#include "kafka_init_producer_id_actor.h"
4+
#include <ydb/core/kafka_proxy/kafka_events.h>
35
#include <ydb/library/actors/core/actor_bootstrapped.h>
6+
#include <ydb/core/kafka_proxy/kqp_helper.h>
47

58
namespace NKafka {
69
/*
710
This class is responsible for one kafka transaction.
811
912
It accumulates transaction state (partitions in tx, offsets) and on commit submits transaction to KQP
1013
*/
11-
class TKafkaTransactionActor : public NActors::TActorBootstrapped<TKafkaTransactionActor> {
14+
class TKafkaTransactionActor : public NActors::TActor<TKafkaTransactionActor> {
1215

13-
using TBase = NActors::TActorBootstrapped<TKafkaTransactionActor>;
16+
using TBase = NActors::TActor<TKafkaTransactionActor>;
1417

1518
public:
16-
void Bootstrap(const NActors::TActorContext&) {
17-
TBase::Become(&TKafkaTransactionActor::StateWork);
18-
}
19+
struct TTopicPartition {
20+
TString TopicPath;
21+
ui32 PartitionId;
22+
23+
bool operator==(const TTopicPartition &other) const = default;
24+
};
25+
26+
struct TopicPartitionHashFn {
27+
size_t operator()(const TTopicPartition& partition) const {
28+
return std::hash<TString>()(partition.TopicPath) ^ std::hash<int64_t>()(partition.PartitionId);
29+
}
30+
};
31+
32+
struct TPartitionCommit {
33+
i32 Partition;
34+
i64 Offset;
35+
TString ConsumerName;
36+
i64 ConsumerGeneration;
37+
};
38+
39+
enum EKafkaTxnKqpRequests : ui8 {
40+
NO_REQUEST = 0,
41+
42+
// This request selects up-to-date producer and consumers states from relevant tables
43+
// After this request a check will happen, that no transaction details has expired.
44+
SELECT,
45+
// This request sends to KQP a command to commit transaction
46+
// Both these requests happen in same transaction
47+
COMMIT
48+
};
49+
50+
// we need to exlplicitly specify kqpActorId and txnCoordinatorActorId for unit tests
51+
TKafkaTransactionActor(const TString& transactionalId, i64 producerId, i16 producerEpoch, const TString& DatabasePath, const TActorId& kqpActorId, const TActorId& txnCoordinatorActorId) :
52+
TActor<TKafkaTransactionActor>(&TKafkaTransactionActor::StateFunc),
53+
TransactionalId(transactionalId),
54+
ProducerInstanceId({producerId, producerEpoch}),
55+
DatabasePath(DatabasePath),
56+
TxnCoordinatorActorId(txnCoordinatorActorId),
57+
KqpActorId(kqpActorId) {};
1958

2059
TStringBuilder LogPrefix() const {
21-
return TStringBuilder() << "KafkaTransactionActor";
60+
return TStringBuilder() << "KafkaTransactionActor{TransactionalId=" << TransactionalId << "; ProducerId=" << ProducerInstanceId.Id << "; ProducerEpoch=" << ProducerInstanceId.Epoch << "}: ";
2261
}
2362

2463
private:
25-
STFUNC(StateWork) {
26-
switch (ev->GetTypeRewrite()) {
27-
// will be eimplemented in a future PR
28-
// ToDo: add poison pill handler
64+
STFUNC(StateFunc) {
65+
try {
66+
switch (ev->GetTypeRewrite()) {
67+
HFunc(TEvKafka::TEvAddPartitionsToTxnRequest, Handle);
68+
HFunc(TEvKafka::TEvAddOffsetsToTxnRequest, Handle);
69+
HFunc(TEvKafka::TEvTxnOffsetCommitRequest, Handle);
70+
HFunc(TEvKafka::TEvEndTxnRequest, Handle);
71+
HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, Handle);
72+
HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle);
73+
HFunc(TEvents::TEvPoison, Handle);
74+
}
75+
} catch (const yexception& y) {
76+
KAFKA_LOG_CRIT(TStringBuilder() << "Critical error happened. Reason: " << y.what());
77+
Die(ActorContext());
2978
}
3079
}
80+
81+
// Kafka API events
82+
void Handle(TEvKafka::TEvAddPartitionsToTxnRequest::TPtr& ev, const TActorContext& ctx);
83+
void Handle(TEvKafka::TEvAddOffsetsToTxnRequest::TPtr& ev, const TActorContext& ctx);
84+
void Handle(TEvKafka::TEvTxnOffsetCommitRequest::TPtr& ev, const TActorContext& ctx);
85+
void Handle(TEvKafka::TEvEndTxnRequest::TPtr& ev, const TActorContext& ctx);
86+
// KQP events
87+
void Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& ctx);
88+
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx);
89+
90+
// Poison pill
91+
void Handle(TEvents::TEvPoison::TPtr& ev, const TActorContext& ctx);
92+
93+
// Transaction commit logic
94+
void StartKqpSession(const TActorContext& ctx);
95+
void SendToKqpValidationRequests(const TActorContext& ctx);
96+
void SendCommitTxnRequest(const TString& kqpTransactionId);
97+
98+
// Response senders
99+
template<class ErrorResponseType, class EventType>
100+
void SendFailResponse(TAutoPtr<TEventHandle<EventType>>& evHandle, EKafkaErrors errorCode, const TString& errorMessage = {});
101+
template<class ResponseType, class EventType>
102+
void SendOkResponse(TAutoPtr<TEventHandle<EventType>>& evHandle);
103+
104+
// helper methods
105+
void Die(const TActorContext &ctx);
106+
template<class EventType>
107+
bool ProducerInRequestIsValid(TMessagePtr<EventType> kafkaRequest);
108+
TString GetFullTopicPath(const TString& topicName);
109+
TString GetYqlWithTablesNames(const TString& templateStr);
110+
NYdb::TParams BuildSelectParams();
111+
THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> BuildCommitTxnRequestToKqp(const TString& kqpTransactionId);
112+
void HandleSelectResponse(const NKqp::TEvKqp::TEvQueryResponse& response, const TActorContext& ctx);
113+
void HandleCommitResponse(const TActorContext& ctx);
114+
TMaybe<TString> GetErrorFromYdbResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev);
115+
TMaybe<TProducerState> ParseProducerState(const NKqp::TEvKqp::TEvQueryResponse& response);
116+
TMaybe<TString> GetErrorInProducerState(const TMaybe<TProducerState>& producerState);
117+
std::unordered_map<TString, i32> ParseConsumersGenerations(const NKqp::TEvKqp::TEvQueryResponse& response);
118+
TMaybe<TString> GetErrorInConsumersStates(const std::unordered_map<TString, i32>& consumerGenerationByName);
119+
TString GetAsStr(EKafkaTxnKqpRequests request);
120+
121+
// data from fields below will be sent to KQP on END_TXN request
122+
std::unordered_map<TTopicPartition, TPartitionCommit, TopicPartitionHashFn> OffsetsToCommit = {};
123+
std::unordered_set<TTopicPartition, TopicPartitionHashFn> PartitionsInTxn = {};
124+
const TString TransactionalId;
125+
const TEvKafka::TProducerInstanceId ProducerInstanceId;
126+
// const i64 ProducerId;
127+
// const i16 ProducerEpoch;
128+
129+
// helper fields
130+
const TString DatabasePath;
131+
// This field need to preserve request details between several requests to KQP
132+
// In case something goes off road, we can always send error back to client
133+
TAutoPtr<TEventHandle<TEvKafka::TEvEndTxnRequest>> EndTxnRequestPtr;
134+
bool CommitStarted = false;
135+
const TActorId TxnCoordinatorActorId;
136+
137+
// communication with KQP
138+
std::unique_ptr<TKqpTxHelper> Kqp;
139+
TActorId KqpActorId;
140+
TString KqpSessionId;
141+
ui64 KqpCookie = 0;
142+
EKafkaTxnKqpRequests LastSentToKqpRequest;
31143
};
32144
} // namespace NKafka
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#include <util/generic/string.h>
2+
3+
namespace NKafka::NKafkaTransactionSql {
4+
5+
TString SELECT_FOR_VALIDATION = R"sql(
6+
--!syntax_v1
7+
DECLARE $Database AS Utf8;
8+
DECLARE $TransactionalId AS Utf8;
9+
DECLARE $ConsumerGroups AS List<Utf8>;
10+
11+
SELECT * FROM `<producer_state_table_name>`
12+
WHERE database = $Database
13+
AND transactional_id = $TransactionalId;
14+
15+
SELECT consumer_group, MAX(generation) FROM `<consumer_state_table_name>`
16+
VIEW PRIMARY KEY
17+
WHERE database = $Database
18+
AND consumer_group IN COMPACT ($ConsumerGroups)
19+
GROUP BY consumer_group;
20+
)sql";
21+
22+
} // namespace NKafka::NKafkaTransactionSql
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#pragma once
2+
3+
#include <util/generic/fwd.h>
4+
5+
namespace NKafka::NKafkaTransactionSql {
6+
7+
constexpr ui32 PRODUCER_STATE_REQUEST_INDEX = 0;
8+
constexpr ui32 CONSUMER_STATES_REQUEST_INDEX = 1;
9+
10+
extern const TString SELECT_FOR_VALIDATION;
11+
12+
} // namespace NKafka::NKafkaTransactionSql
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
#include "txn_actor_response_builder.h"
2+
3+
namespace NKafka::NKafkaTransactions {
4+
template<class ResponseType, class RequestType>
5+
std::shared_ptr<ResponseType> BuildResponse(TMessagePtr<RequestType> request, EKafkaErrors errorCode) {
6+
Y_UNUSED(request); // used in other template functions
7+
auto response = std::make_shared<ResponseType>();
8+
response->ErrorCode = errorCode;
9+
return response;
10+
};
11+
12+
template std::shared_ptr<TAddOffsetsToTxnResponseData> BuildResponse<TAddOffsetsToTxnResponseData, TAddOffsetsToTxnRequestData>(TMessagePtr<TAddOffsetsToTxnRequestData> request, EKafkaErrors errorCode);
13+
14+
template std::shared_ptr<TEndTxnResponseData> BuildResponse<TEndTxnResponseData, TEndTxnRequestData>(TMessagePtr<TEndTxnRequestData> request, EKafkaErrors errorCode);
15+
16+
template<>
17+
std::shared_ptr<TAddPartitionsToTxnResponseData> BuildResponse<TAddPartitionsToTxnResponseData, TAddPartitionsToTxnRequestData>(TMessagePtr<TAddPartitionsToTxnRequestData> request, EKafkaErrors errorCode) {
18+
auto response = std::make_shared<TAddPartitionsToTxnResponseData>();
19+
std::vector<TAddPartitionsToTxnResponseData::TAddPartitionsToTxnTopicResult> topicsResponse;
20+
topicsResponse.reserve(request->Topics.size());
21+
for (const auto& requestTopic : request->Topics) {
22+
TAddPartitionsToTxnResponseData::TAddPartitionsToTxnTopicResult topicInResponse;
23+
topicInResponse.Name = requestTopic.Name;
24+
topicInResponse.Results.reserve(requestTopic.Partitions.size());
25+
for (const auto& requestPartition : requestTopic.Partitions) {
26+
TAddPartitionsToTxnResponseData::TAddPartitionsToTxnTopicResult::TAddPartitionsToTxnPartitionResult partitionInResponse;
27+
partitionInResponse.PartitionIndex = requestPartition;
28+
partitionInResponse.ErrorCode = errorCode;
29+
topicInResponse.Results.push_back(partitionInResponse);
30+
}
31+
topicsResponse.push_back(topicInResponse);
32+
}
33+
response->Results = std::move(topicsResponse);
34+
return response;
35+
};
36+
37+
template<>
38+
std::shared_ptr<TTxnOffsetCommitResponseData> BuildResponse<TTxnOffsetCommitResponseData, TTxnOffsetCommitRequestData>(TMessagePtr<TTxnOffsetCommitRequestData> request, EKafkaErrors errorCode) {
39+
auto response = std::make_shared<TTxnOffsetCommitResponseData>();
40+
std::vector<TTxnOffsetCommitResponseData::TTxnOffsetCommitResponseTopic> topicsResponse;
41+
topicsResponse.reserve(request->Topics.size());
42+
for (const auto& requestTopic : request->Topics) {
43+
TTxnOffsetCommitResponseData::TTxnOffsetCommitResponseTopic topicInResponse;
44+
topicInResponse.Name = requestTopic.Name;
45+
topicInResponse.Partitions.reserve(requestTopic.Partitions.size());
46+
for (const auto& requestPartition : requestTopic.Partitions) {
47+
TTxnOffsetCommitResponseData::TTxnOffsetCommitResponseTopic::TTxnOffsetCommitResponsePartition partitionInResponse;
48+
partitionInResponse.PartitionIndex = requestPartition.PartitionIndex;
49+
partitionInResponse.ErrorCode = errorCode;
50+
topicInResponse.Partitions.push_back(partitionInResponse);
51+
}
52+
topicsResponse.push_back(topicInResponse);
53+
}
54+
response->Topics = std::move(topicsResponse);;
55+
return response;
56+
};
57+
} // namespace NKafka::NKafkaTransactions
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
#pragma once
2+
3+
#include <ydb/core/kafka_proxy/actors/actors.h>
4+
#include <ydb/core/kafka_proxy/kafka.h>
5+
#include <ydb/core/kafka_proxy/kafka_messages.h>
6+
7+
namespace NKafka::NKafkaTransactions {
8+
template<class ResponseType, class RequestType>
9+
std::shared_ptr<ResponseType> BuildResponse(TMessagePtr<RequestType> request, EKafkaErrors errorCode);
10+
} // namespace NKafka::NKafkaTransactions

ydb/core/kafka_proxy/kafka_events.h

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ struct TEvKafka {
3737
EvAddOffsetsToTxnRequest,
3838
EvTxnOffsetCommitRequest,
3939
EvEndTxnRequest,
40+
EvTransactionActorDied,
4041
EvResponse = EvRequest + 256,
4142
EvInternalEvents = EvResponse + 256,
4243
EvEnd
@@ -255,15 +256,17 @@ struct TEvTopicModificationResponse : public NActors::TEventLocal<TEvTopicModifi
255256
};
256257

257258
struct TEvAddPartitionsToTxnRequest : public TEventLocal<TEvAddPartitionsToTxnRequest, EvAddPartitionsToTxnRequest> {
258-
TEvAddPartitionsToTxnRequest(const ui64 correlationId, const TMessagePtr<TAddPartitionsToTxnRequestData>& request, const TActorId connectionId)
259+
TEvAddPartitionsToTxnRequest(const ui64 correlationId, const TMessagePtr<TAddPartitionsToTxnRequestData>& request, const TActorId connectionId, const TString& databasePath)
259260
: CorrelationId(correlationId)
260261
, Request(request)
261262
, ConnectionId(connectionId)
263+
, DatabasePath(databasePath)
262264
{}
263265

264266
ui64 CorrelationId;
265267
const TMessagePtr<TAddPartitionsToTxnRequestData> Request;
266268
TActorId ConnectionId;
269+
TString DatabasePath;
267270
};
268271

269272
struct TEvTopicDescribeResponse : public NActors::TEventLocal<TEvTopicDescribeResponse, EvDescribeTopicsResponse>
@@ -286,54 +289,64 @@ struct TEvTopicDescribeResponse : public NActors::TEventLocal<TEvTopicDescribeRe
286289
};
287290

288291
struct TEvAddOffsetsToTxnRequest : public TEventLocal<TEvAddOffsetsToTxnRequest, EvAddOffsetsToTxnRequest> {
289-
TEvAddOffsetsToTxnRequest(const ui64 correlationId, const TMessagePtr<TAddOffsetsToTxnRequestData>& request, const TActorId connectionId)
292+
TEvAddOffsetsToTxnRequest(const ui64 correlationId, const TMessagePtr<TAddOffsetsToTxnRequestData>& request, const TActorId connectionId, const TString& databasePath)
290293
: CorrelationId(correlationId)
291294
, Request(request)
292295
, ConnectionId(connectionId)
296+
, DatabasePath(databasePath)
293297
{}
294298

295299
ui64 CorrelationId;
296300
const TMessagePtr<TAddOffsetsToTxnRequestData> Request;
297301
TActorId ConnectionId;
302+
TString DatabasePath;
298303
};
299304

300305
struct TEvTxnOffsetCommitRequest : public TEventLocal<TEvTxnOffsetCommitRequest, EvTxnOffsetCommitRequest> {
301-
TEvTxnOffsetCommitRequest(const ui64 correlationId, const TMessagePtr<TTxnOffsetCommitRequestData>& request, const TActorId connectionId)
306+
TEvTxnOffsetCommitRequest(const ui64 correlationId, const TMessagePtr<TTxnOffsetCommitRequestData>& request, const TActorId connectionId, const TString& databasePath)
302307
: CorrelationId(correlationId)
303308
, Request(request)
304309
, ConnectionId(connectionId)
310+
, DatabasePath(databasePath)
305311
{}
306312

307313
ui64 CorrelationId;
308314
const TMessagePtr<TTxnOffsetCommitRequestData> Request;
309315
TActorId ConnectionId;
316+
TString DatabasePath;
310317
};
311318

312319
struct TEvEndTxnRequest : public TEventLocal<TEvEndTxnRequest, EvEndTxnRequest> {
313-
TEvEndTxnRequest(const ui64 correlationId, const TMessagePtr<TEndTxnRequestData>& request, const TActorId connectionId)
320+
TEvEndTxnRequest(const ui64 correlationId, const TMessagePtr<TEndTxnRequestData>& request, const TActorId connectionId, const TString& databasePath)
314321
: CorrelationId(correlationId)
315322
, Request(request)
316323
, ConnectionId(connectionId)
324+
, DatabasePath(databasePath)
317325
{}
318326

319327
ui64 CorrelationId;
320328
const TMessagePtr<TEndTxnRequestData> Request;
321329
TActorId ConnectionId;
330+
TString DatabasePath;
331+
};
332+
struct TProducerInstanceId {
333+
i64 Id;
334+
i32 Epoch;
335+
336+
auto operator<=>(TProducerInstanceId const&) const = default;
322337
};
323338

324339
/*
325340
Event sent from TIintProducerActor to TKafkaTransactionRouter to notify that producer id will be obtained by client
326341
*/
327342
struct TEvSaveTxnProducerRequest : public NActors::TEventLocal<TEvSaveTxnProducerRequest, EvSaveTxnProducerRequest> {
328-
TEvSaveTxnProducerRequest(const TString& transactionalId, const i64 producerId, const i16 producerEpoch) :
329-
TransactionalId(std::move(transactionalId)),
330-
ProducerId(producerId),
331-
ProducerEpoch(producerEpoch)
343+
TEvSaveTxnProducerRequest(const TString& transactionalId, const TProducerInstanceId& producerState) :
344+
TransactionalId(transactionalId),
345+
ProducerState(producerState)
332346
{}
333347

334348
const TString TransactionalId;
335-
const i64 ProducerId;
336-
const i16 ProducerEpoch;
349+
const TProducerInstanceId ProducerState;
337350
};
338351

339352
/*
@@ -357,6 +370,16 @@ struct TEvSaveTxnProducerResponse : public NActors::TEventLocal<TEvSaveTxnProduc
357370
EStatus Status;
358371
TString Message;
359372
};
373+
374+
struct TEvTransactionActorDied : public NActors::TEventLocal<TEvTransactionActorDied, EvTransactionActorDied> {
375+
TEvTransactionActorDied(const TString& transactionalId, const TProducerInstanceId& producerState) :
376+
TransactionalId(transactionalId),
377+
ProducerState(producerState)
378+
{}
379+
380+
const TString TransactionalId;
381+
const TProducerInstanceId ProducerState;
382+
};
360383
}; // struct TEvKafka
361384

362385
} // namespace NKafka

0 commit comments

Comments
 (0)