Skip to content

[Kafka API] Aborted transactions cleanup #19733

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 23 commits into
base: main
Choose a base branch
from

Conversation

a-serebryanskiy
Copy link
Collaborator

Changelog entry

This PR is part of the "Transactions in Kafka API" epic. It adds functionality to cleanup data for aborted transactions.

Changelog category

  • Not for changelog (changelog entry is not required)

Description for reviewers

...

Copy link

🟢 2025-06-17 11:57:31 UTC The validation of the Pull Request description is successful.

Copy link

github-actions bot commented Jun 18, 2025

2025-06-18 14:21:14 UTC Pre-commit check linux-x86_64-relwithdebinfo for d248896 has started.
2025-06-18 14:21:39 UTC Artifacts will be uploaded here
2025-06-18 14:24:58 UTC Check cancelled

Copy link

github-actions bot commented Jun 18, 2025

2025-06-18 14:21:36 UTC Pre-commit check linux-x86_64-release-asan for d248896 has started.
2025-06-18 14:21:47 UTC Artifacts will be uploaded here
2025-06-18 14:24:58 UTC Check cancelled

Copy link

github-actions bot commented Jun 18, 2025

2025-06-18 14:27:16 UTC Pre-commit check linux-x86_64-relwithdebinfo for 11afc85 has started.
2025-06-18 14:27:28 UTC Artifacts will be uploaded here
2025-06-18 14:31:13 UTC ya make is running...
2025-06-18 14:36:39 UTC Check cancelled

Copy link

github-actions bot commented Jun 18, 2025

2025-06-18 14:27:24 UTC Pre-commit check linux-x86_64-release-asan for 11afc85 has started.
2025-06-18 14:27:44 UTC Artifacts will be uploaded here
2025-06-18 14:32:04 UTC ya make is running...
2025-06-18 14:36:39 UTC Check cancelled

Copy link

github-actions bot commented Jun 18, 2025

2025-06-18 14:38:25 UTC Pre-commit check linux-x86_64-release-asan for 28daa9e has started.
2025-06-18 14:38:29 UTC Artifacts will be uploaded here
2025-06-18 14:42:20 UTC ya make is running...
🟡 2025-06-18 17:06:52 UTC Some tests failed, follow the links below. This fail is not in blocking policy yet Going to retry failed tests...

Test history | Ya make output | Test bloat

TESTS PASSED ERRORS FAILED SKIPPED MUTED?
15903 15601 0 105 172 25

2025-06-18 17:08:29 UTC ya make is running... (failed tests rerun, try 2)
🟡 2025-06-18 17:41:33 UTC Some tests failed, follow the links below. This fail is not in blocking policy yet Going to retry failed tests...

Test history | Ya make output | Test bloat | Test bloat

TESTS PASSED ERRORS FAILED SKIPPED MUTED?
1551 (only retried tests) 1301 0 70 155 25

2025-06-18 17:41:51 UTC ya make is running... (failed tests rerun, try 3)
🟡 2025-06-18 18:15:24 UTC Some tests failed, follow the links below. This fail is not in blocking policy yet

Test history | Ya make output | Test bloat | Test bloat | Test bloat

TESTS PASSED ERRORS FAILED SKIPPED MUTED?
1563 (only retried tests) 1352 0 54 133 24

🟢 2025-06-18 18:15:40 UTC Build successful.
🟡 2025-06-18 18:16:15 UTC ydbd size 3.9 GiB changed* by +748.1 KiB, which is >= 100.0 KiB vs main: Warning

ydbd size dash main: 0808af4 merge: 28daa9e diff diff %
ydbd size 4 174 298 976 Bytes 4 175 065 024 Bytes +748.1 KiB +0.018%
ydbd stripped size 1 447 457 720 Bytes 1 447 640 088 Bytes +178.1 KiB +0.013%

*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation

Copy link

github-actions bot commented Jun 18, 2025

2025-06-18 14:48:33 UTC Pre-commit check linux-x86_64-relwithdebinfo for 28daa9e has started.
2025-06-18 14:49:01 UTC Artifacts will be uploaded here
2025-06-18 14:53:23 UTC ya make is running...
🟡 2025-06-18 16:44:29 UTC Some tests failed, follow the links below. Going to retry failed tests...

Test history | Ya make output | Test bloat

TESTS PASSED ERRORS FAILED SKIPPED MUTED?
38190 35489 0 4 2664 33

2025-06-18 16:47:48 UTC ya make is running... (failed tests rerun, try 2)
🟢 2025-06-18 16:59:32 UTC Tests successful.

Test history | Ya make output | Test bloat | Test bloat

TESTS PASSED ERRORS FAILED SKIPPED MUTED?
632 (only retried tests) 604 0 0 3 25

🟢 2025-06-18 16:59:45 UTC Build successful.
🟡 2025-06-18 17:00:05 UTC ydbd size 2.2 GiB changed* by +597.4 KiB, which is >= 100.0 KiB vs main: Warning

ydbd size dash main: 38bd366 merge: 28daa9e diff diff %
ydbd size 2 373 172 192 Bytes 2 373 783 920 Bytes +597.4 KiB +0.026%
ydbd stripped size 497 552 456 Bytes 497 650 152 Bytes +95.4 KiB +0.020%

*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation

// after transaction timeout has passed.
//
// Total time till kafka supportive partition deletion = AppData.KafkaProxyConfig.TransactionTimeoutMs + KAFKA_TRANSACTION_DELETE_DELAY_MS
static const ui32 KAFKA_TRANSACTION_DELETE_DELAY_MS = 60 * 60 * 1000; // 1 hour;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Можно TDuration::Hours(1).MilliSeconds(). Принципиально хранить в виде целого числа? Можно поменять тип на TDuration?

@@ -925,6 +926,11 @@ void TPersQueue::InitTxWrites(const NKikimrPQ::TTabletTxInfo& info,
if (writeId.IsTopicApiTransaction()) {
SubscribeWriteId(writeId, ctx);
}

if (txWrite.GetKafkaTransaction() && txWrite.HasCreatedAt()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Мы защитились feature-флагом от ситуации когда нет одного из значений?

};

for (auto& pair : TxWrites) {
if (pair.second.KafkaTransaction && txnExpired(pair.second)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Первое условие лишнее? Оно повторно проверяется в txnExpired

}

TString GetSupportivePartitionKeyFrom() {
return std::string{TKeyPrefix::EServiceType::ServiceTypeData};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Лишнее преобразование из std::string в TString. Здесь и ниже.

const auto& result = response->Record.GetReadRangeResult(0);
if (result.GetStatus() == static_cast<ui32>(NKikimrProto::OK)) {
for (ui32 i = 0; i < result.PairSize(); i++) {
supportivePartitionsKeys.emplace_back(result.GetPair(i).GetKey().c_str());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Без вызова c_str() не компилируется?

@@ -2305,6 +2305,7 @@ message TKafkaProxyConfig {
optional TProxy Proxy = 7;
optional bool MeteringV2Enabled = 10 [default = true];
optional bool AuthViaApiKey = 11 [default = true];
optional uint32 TransactionTimeoutMs = 12 [default = 86400]; // 1 day
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Значение по умолчанию надо умножить на 1000.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants