Skip to content

Commit 2c5c1c1

Browse files
authored
[Stable-25-1-2] Flush only insert before distributed commit (#20586)
2 parents e49b360 + 23a9b0d commit 2c5c1c1

File tree

2 files changed

+53
-18
lines changed

2 files changed

+53
-18
lines changed

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 53 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,9 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
333333
std::move(columnsMetadata),
334334
std::move(writeIndexes),
335335
priority);
336+
337+
// At current time only insert operation can fail.
338+
NeedToFlushBeforeCommit |= (operationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT);
336339
CA_LOG_D("Open: token=" << token);
337340
return token;
338341
}
@@ -365,6 +368,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
365368
void CleanupClosedTokens() {
366369
YQL_ENSURE(ShardedWriteController);
367370
ShardedWriteController->CleanupClosedTokens();
371+
NeedToFlushBeforeCommit = false;
368372
}
369373

370374
void SetParentTraceId(NWilson::TTraceId traceId) {
@@ -1301,6 +1305,10 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
13011305
Stats.AffectedPartitions.clear();
13021306
}
13031307

1308+
bool FlushBeforeCommit() const {
1309+
return NeedToFlushBeforeCommit;
1310+
}
1311+
13041312
private:
13051313
NActors::TActorId PipeCacheId = NKikimr::MakePipePerNodeCacheID(false);
13061314

@@ -1329,6 +1337,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
13291337

13301338
IKqpTransactionManagerPtr TxManager;
13311339
bool Closed = false;
1340+
bool NeedToFlushBeforeCommit = false;
13321341
EMode Mode = EMode::WRITE;
13331342
THashMap<ui64, TInstant> SendTime;
13341343

@@ -1659,7 +1668,6 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
16591668
std::optional<TKqpTableWriteActor::TWriteToken> WriteToken;
16601669

16611670
bool Closed = false;
1662-
16631671
bool WaitingForTableActor = false;
16641672

16651673
NWilson::TSpan DirectWriteActorSpan;
@@ -1877,6 +1885,23 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
18771885
Process();
18781886
}
18791887

1888+
bool NeedToFlush() {
1889+
const bool outOfMemory = GetTotalFreeSpace() <= 0;
1890+
const bool needToFlush = outOfMemory
1891+
|| State == EState::FLUSHING
1892+
|| State == EState::PREPARING
1893+
|| State == EState::COMMITTING
1894+
|| State == EState::ROLLINGBACK;
1895+
return needToFlush;
1896+
}
1897+
1898+
bool NeedToFlushActor(const TKqpTableWriteActor* actor) {
1899+
return NeedToFlush()
1900+
&& (State != EState::FLUSHING
1901+
|| !TxId // Flush between queries
1902+
|| actor->FlushBeforeCommit()); // Flush before commit
1903+
}
1904+
18801905
bool Process() {
18811906
ProcessRequestQueue();
18821907
if (!ProcessWrite()) {
@@ -1887,7 +1912,9 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
18871912
if (State == EState::FLUSHING) {
18881913
bool isEmpty = true;
18891914
for (auto& [_, info] : WriteInfos) {
1890-
isEmpty = isEmpty && info.WriteTableActor->IsReady() && info.WriteTableActor->IsEmpty();
1915+
if (NeedToFlushActor(info.WriteTableActor)) {
1916+
isEmpty = isEmpty && info.WriteTableActor->IsReady() && info.WriteTableActor->IsEmpty();
1917+
}
18911918
}
18921919
if (isEmpty) {
18931920
OnFlushed();
@@ -1942,14 +1969,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
19421969
}
19431970

19441971
bool ProcessWrite() {
1945-
const bool outOfMemory = GetTotalFreeSpace() <= 0;
1946-
const bool needToFlush = outOfMemory
1947-
|| State == EState::FLUSHING
1948-
|| State == EState::PREPARING
1949-
|| State == EState::COMMITTING
1950-
|| State == EState::ROLLINGBACK;
1951-
1952-
if (!EnableStreamWrite && outOfMemory) {
1972+
if (!EnableStreamWrite && GetTotalFreeSpace() <= 0) {
19531973
ReplyErrorAndDie(
19541974
NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
19551975
NYql::TIssuesIds::KIKIMR_PRECONDITION_FAILED,
@@ -1959,10 +1979,10 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
19591979
return false;
19601980
}
19611981

1962-
if (needToFlush) {
1982+
if (NeedToFlush()) {
19631983
CA_LOG_D("Flush data");
19641984
for (auto& [_, info] : WriteInfos) {
1965-
if (info.WriteTableActor->IsReady()) {
1985+
if (info.WriteTableActor->IsReady() && NeedToFlushActor(info.WriteTableActor)) {
19661986
if (!info.WriteTableActor->FlushToShards()) {
19671987
return false;
19681988
}
@@ -1992,13 +2012,13 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
19922012

19932013
CA_LOG_D("Start prepare for distributed commit");
19942014
YQL_ENSURE(State == EState::WRITING);
1995-
YQL_ENSURE(!NeedToFlushBeforeCommit);
19962015
State = EState::PREPARING;
19972016
for (auto& [_, queue] : DataQueues) {
19982017
YQL_ENSURE(queue.empty());
19992018
}
20002019
YQL_ENSURE(TxId);
20012020
for (auto& [_, info] : WriteInfos) {
2021+
AFL_ENSURE(!info.WriteTableActor->FlushBeforeCommit());
20022022
info.WriteTableActor->SetPrepare(*TxId);
20032023
}
20042024
Close();
@@ -2452,7 +2472,13 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
24522472
} else {
24532473
AFL_ENSURE(ev->Get()->TxId);
24542474
TxId = ev->Get()->TxId;
2455-
if (NeedToFlushBeforeCommit) {
2475+
2476+
bool needToFlushBeforeCommit = false;
2477+
for (auto& [_, info] : WriteInfos) {
2478+
needToFlushBeforeCommit |= info.WriteTableActor->FlushBeforeCommit();
2479+
}
2480+
2481+
if (needToFlushBeforeCommit) {
24562482
Flush(std::move(ev->TraceId));
24572483
} else {
24582484
TxManager->StartPrepare();
@@ -2837,8 +2863,19 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
28372863
UpdateTracingState("Write", BufferWriteActorSpan.GetTraceId());
28382864
OnOperationFinished(Counters->BufferActorFlushLatencyHistogram);
28392865
State = EState::WRITING;
2840-
AFL_ENSURE(!TxId || NeedToFlushBeforeCommit); // TxId => NeedToFlushBeforeCommit
2841-
NeedToFlushBeforeCommit = false;
2866+
2867+
for (auto& [_, info] : WriteInfos) {
2868+
AFL_ENSURE(TxId || info.WriteTableActor->IsEmpty());
2869+
if (info.WriteTableActor->IsEmpty()) {
2870+
info.WriteTableActor->CleanupClosedTokens();
2871+
}
2872+
if (!TxId) {
2873+
info.WriteTableActor->Unlink();
2874+
}
2875+
2876+
AFL_ENSURE(!info.WriteTableActor->FlushBeforeCommit());
2877+
}
2878+
28422879
if (TxId) {
28432880
TxManager->StartPrepare();
28442881
Prepare(std::nullopt);
@@ -2972,7 +3009,6 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
29723009

29733010
EState State;
29743011
bool HasError = false;
2975-
bool NeedToFlushBeforeCommit = false;
29763012
THashMap<TPathId, std::queue<TBufferWriteMessage>> DataQueues;
29773013

29783014
struct TAckMessage {

ydb/core/kqp/runtime/kqp_write_table.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1381,7 +1381,6 @@ class TShardedWriteController : public IShardedWriteController {
13811381
}
13821382

13831383
void CleanupClosedTokens() override {
1384-
AFL_ENSURE(IsEmpty());
13851384
for (auto it = WriteInfos.begin(); it != WriteInfos.end();) {
13861385
if (it->second.Closed) {
13871386
AFL_ENSURE(it->second.Serializer->IsFinished());

0 commit comments

Comments
 (0)