Skip to content

Commit b16e4d1

Browse files
committed
Flush only INSERTs before commit (#19744)
1 parent efdee79 commit b16e4d1

File tree

2 files changed

+54
-19
lines changed

2 files changed

+54
-19
lines changed

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 54 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,9 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
325325
std::move(columnsMetadata),
326326
std::move(writeIndexes),
327327
priority);
328+
329+
// At current time only insert operation can fail.
330+
NeedToFlushBeforeCommit |= (operationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT);
328331
CA_LOG_D("Open: token=" << token);
329332
return token;
330333
}
@@ -357,6 +360,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
357360
void CleanupClosedTokens() {
358361
YQL_ENSURE(ShardedWriteController);
359362
ShardedWriteController->CleanupClosedTokens();
363+
NeedToFlushBeforeCommit = false;
360364
}
361365

362366
void SetParentTraceId(NWilson::TTraceId traceId) {
@@ -1276,6 +1280,10 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
12761280
Stats.AffectedPartitions.clear();
12771281
}
12781282

1283+
bool FlushBeforeCommit() const {
1284+
return NeedToFlushBeforeCommit;
1285+
}
1286+
12791287
private:
12801288
NActors::TActorId PipeCacheId = NKikimr::MakePipePerNodeCacheID(false);
12811289

@@ -1304,6 +1312,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
13041312

13051313
IKqpTransactionManagerPtr TxManager;
13061314
bool Closed = false;
1315+
bool NeedToFlushBeforeCommit = false;
13071316
EMode Mode = EMode::WRITE;
13081317
THashMap<ui64, TInstant> SendTime;
13091318

@@ -1634,7 +1643,6 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
16341643
std::optional<TKqpTableWriteActor::TWriteToken> WriteToken;
16351644

16361645
bool Closed = false;
1637-
16381646
bool WaitingForTableActor = false;
16391647

16401648
NWilson::TSpan DirectWriteActorSpan;
@@ -1852,6 +1860,23 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
18521860
Process();
18531861
}
18541862

1863+
bool NeedToFlush() {
1864+
const bool outOfMemory = GetTotalFreeSpace() <= 0;
1865+
const bool needToFlush = outOfMemory
1866+
|| State == EState::FLUSHING
1867+
|| State == EState::PREPARING
1868+
|| State == EState::COMMITTING
1869+
|| State == EState::ROLLINGBACK;
1870+
return needToFlush;
1871+
}
1872+
1873+
bool NeedToFlushActor(const TKqpTableWriteActor* actor) {
1874+
return NeedToFlush()
1875+
&& (State != EState::FLUSHING
1876+
|| !TxId // Flush between queries
1877+
|| actor->FlushBeforeCommit()); // Flush before commit
1878+
}
1879+
18551880
bool Process() {
18561881
ProcessRequestQueue();
18571882
if (!ProcessWrite()) {
@@ -1862,7 +1887,9 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
18621887
if (State == EState::FLUSHING) {
18631888
bool isEmpty = true;
18641889
for (auto& [_, info] : WriteInfos) {
1865-
isEmpty = isEmpty && info.WriteTableActor->IsReady() && info.WriteTableActor->IsEmpty();
1890+
if (NeedToFlushActor(actor)) {
1891+
isEmpty = isEmpty && info.WriteTableActor->IsReady() && info.WriteTableActor->IsEmpty();
1892+
}
18661893
}
18671894
if (isEmpty) {
18681895
OnFlushed();
@@ -1916,15 +1943,8 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
19161943
}
19171944
}
19181945

1919-
bool ProcessWrite() {
1920-
const bool outOfMemory = GetTotalFreeSpace() <= 0;
1921-
const bool needToFlush = outOfMemory
1922-
|| State == EState::FLUSHING
1923-
|| State == EState::PREPARING
1924-
|| State == EState::COMMITTING
1925-
|| State == EState::ROLLINGBACK;
1926-
1927-
if (!EnableStreamWrite && outOfMemory) {
1946+
bool ProcessFlush() {
1947+
if (!EnableStreamWrite && GetTotalFreeSpace() <= 0) {
19281948
ReplyErrorAndDie(
19291949
NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
19301950
NYql::TIssuesIds::KIKIMR_PRECONDITION_FAILED,
@@ -1934,10 +1954,10 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
19341954
return false;
19351955
}
19361956

1937-
if (needToFlush) {
1957+
if (NeedToFlush()) {
19381958
CA_LOG_D("Flush data");
19391959
for (auto& [_, info] : WriteInfos) {
1940-
if (info.WriteTableActor->IsReady()) {
1960+
if (info.WriteTableActor->IsReady() && NeedToFlushActor(actor)) {
19411961
if (!info.WriteTableActor->FlushToShards()) {
19421962
return false;
19431963
}
@@ -1967,13 +1987,13 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
19671987

19681988
CA_LOG_D("Start prepare for distributed commit");
19691989
YQL_ENSURE(State == EState::WRITING);
1970-
YQL_ENSURE(!NeedToFlushBeforeCommit);
19711990
State = EState::PREPARING;
19721991
for (auto& [_, queue] : DataQueues) {
19731992
YQL_ENSURE(queue.empty());
19741993
}
19751994
YQL_ENSURE(TxId);
19761995
for (auto& [_, info] : WriteInfos) {
1996+
AFL_ENSURE(!actor->FlushBeforeCommit());
19771997
info.WriteTableActor->SetPrepare(*TxId);
19781998
}
19791999
Close();
@@ -2427,7 +2447,13 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
24272447
} else {
24282448
AFL_ENSURE(ev->Get()->TxId);
24292449
TxId = ev->Get()->TxId;
2430-
if (NeedToFlushBeforeCommit) {
2450+
2451+
bool needToFlushBeforeCommit = false;
2452+
for (auto& [_, info] : WriteInfos) {
2453+
needToFlushBeforeCommit |= info.WriteTableActor->FlushBeforeCommit();
2454+
}
2455+
2456+
if (needToFlushBeforeCommit) {
24312457
Flush(std::move(ev->TraceId));
24322458
} else {
24332459
TxManager->StartPrepare();
@@ -2812,8 +2838,19 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
28122838
UpdateTracingState("Write", BufferWriteActorSpan.GetTraceId());
28132839
OnOperationFinished(Counters->BufferActorFlushLatencyHistogram);
28142840
State = EState::WRITING;
2815-
AFL_ENSURE(!TxId || NeedToFlushBeforeCommit); // TxId => NeedToFlushBeforeCommit
2816-
NeedToFlushBeforeCommit = false;
2841+
2842+
for (auto& [_, info] : WriteInfos) {
2843+
AFL_ENSURE(TxId || info.WriteTableActor->IsEmpty());
2844+
if (info.WriteTableActor->IsEmpty()) {
2845+
info.WriteTableActor->CleanupClosedTokens();
2846+
}
2847+
if (!TxId) {
2848+
info.WriteTableActor->Unlink();
2849+
}
2850+
2851+
AFL_ENSURE(!info.WriteTableActor->FlushBeforeCommit());
2852+
}
2853+
28172854
if (TxId) {
28182855
TxManager->StartPrepare();
28192856
Prepare(std::nullopt);
@@ -2947,7 +2984,6 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
29472984

29482985
EState State;
29492986
bool HasError = false;
2950-
bool NeedToFlushBeforeCommit = false;
29512987
THashMap<TPathId, std::queue<TBufferWriteMessage>> DataQueues;
29522988

29532989
struct TAckMessage {

ydb/core/kqp/runtime/kqp_write_table.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1381,7 +1381,6 @@ class TShardedWriteController : public IShardedWriteController {
13811381
}
13821382

13831383
void CleanupClosedTokens() override {
1384-
AFL_ENSURE(IsEmpty());
13851384
for (auto it = WriteInfos.begin(); it != WriteInfos.end();) {
13861385
if (it->second.Closed) {
13871386
AFL_ENSURE(it->second.Serializer->IsFinished());

0 commit comments

Comments
 (0)