-
Notifications
You must be signed in to change notification settings - Fork 695
[Kafka API] Aborted transactions cleanup #19733
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[Kafka API] Aborted transactions cleanup #19733
Conversation
🟢 |
⚪ Test history | Ya make output | Test bloat
⚪ Test history | Ya make output | Test bloat | Test bloat
⚪ Test history | Ya make output | Test bloat | Test bloat | Test bloat
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
⚪ Test history | Ya make output | Test bloat
⚪ Test history | Ya make output | Test bloat | Test bloat
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
// after transaction timeout has passed. | ||
// | ||
// Total time till kafka supportive partition deletion = AppData.KafkaProxyConfig.TransactionTimeoutMs + KAFKA_TRANSACTION_DELETE_DELAY_MS | ||
static const ui32 KAFKA_TRANSACTION_DELETE_DELAY_MS = 60 * 60 * 1000; // 1 hour; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Можно TDuration::Hours(1).MilliSeconds()
. Принципиально хранить в виде целого числа? Можно поменять тип на TDuration
?
@@ -925,6 +926,11 @@ void TPersQueue::InitTxWrites(const NKikimrPQ::TTabletTxInfo& info, | |||
if (writeId.IsTopicApiTransaction()) { | |||
SubscribeWriteId(writeId, ctx); | |||
} | |||
|
|||
if (txWrite.GetKafkaTransaction() && txWrite.HasCreatedAt()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Мы защитились feature-флагом от ситуации когда нет одного из значений?
}; | ||
|
||
for (auto& pair : TxWrites) { | ||
if (pair.second.KafkaTransaction && txnExpired(pair.second)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Первое условие лишнее? Оно повторно проверяется в txnExpired
} | ||
|
||
TString GetSupportivePartitionKeyFrom() { | ||
return std::string{TKeyPrefix::EServiceType::ServiceTypeData}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Лишнее преобразование из std::string
в TString
. Здесь и ниже.
const auto& result = response->Record.GetReadRangeResult(0); | ||
if (result.GetStatus() == static_cast<ui32>(NKikimrProto::OK)) { | ||
for (ui32 i = 0; i < result.PairSize(); i++) { | ||
supportivePartitionsKeys.emplace_back(result.GetPair(i).GetKey().c_str()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Без вызова c_str()
не компилируется?
@@ -2305,6 +2305,7 @@ message TKafkaProxyConfig { | |||
optional TProxy Proxy = 7; | |||
optional bool MeteringV2Enabled = 10 [default = true]; | |||
optional bool AuthViaApiKey = 11 [default = true]; | |||
optional uint32 TransactionTimeoutMs = 12 [default = 86400]; // 1 day |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Значение по умолчанию надо умножить на 1000.
Changelog entry
This PR is part of the "Transactions in Kafka API" epic. It adds functionality to cleanup data for aborted transactions.
Changelog category
Description for reviewers
...