Skip to content

Commit cfefaab

Browse files
committed
Flush on insert before distributed commit (#19293)
1 parent a622789 commit cfefaab

File tree

1 file changed

+26
-7
lines changed

1 file changed

+26
-7
lines changed

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1961,19 +1961,20 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
19611961
return Process();
19621962
}
19631963

1964-
bool Prepare(const ui64 txId, NWilson::TTraceId traceId) {
1964+
bool Prepare(std::optional<NWilson::TTraceId> traceId) {
19651965
UpdateTracingState("Commit", std::move(traceId));
19661966
OperationStartTime = TInstant::Now();
19671967

19681968
CA_LOG_D("Start prepare for distributed commit");
19691969
YQL_ENSURE(State == EState::WRITING);
1970+
YQL_ENSURE(!NeedToFlushBeforeCommit);
19701971
State = EState::PREPARING;
19711972
for (auto& [_, queue] : DataQueues) {
19721973
YQL_ENSURE(queue.empty());
19731974
}
1974-
TxId = txId;
1975+
YQL_ENSURE(TxId)
19751976
for (auto& [_, info] : WriteInfos) {
1976-
info.WriteTableActor->SetPrepare(txId);
1977+
info.WriteTableActor->SetPrepare(*TxId);
19771978
}
19781979
Close();
19791980
if (!Process()) {
@@ -2424,8 +2425,14 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
24242425
TxManager->StartExecute();
24252426
ImmediateCommit(std::move(ev->TraceId));
24262427
} else {
2427-
TxManager->StartPrepare();
2428-
Prepare(ev->Get()->TxId, std::move(ev->TraceId));
2428+
AFL_ENSURE(ev->Get()->TxId);
2429+
TxId = ev->Get()->TxId;
2430+
if (NeedToFlushBeforeCommit) {
2431+
Flush(std::move(ev->TraceId));
2432+
} else {
2433+
TxManager->StartPrepare();
2434+
Prepare(std::move(ev->TraceId));
2435+
}
24292436
}
24302437
}
24312438

@@ -2805,6 +2812,14 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
28052812
UpdateTracingState("Write", BufferWriteActorSpan.GetTraceId());
28062813
OnOperationFinished(Counters->BufferActorFlushLatencyHistogram);
28072814
State = EState::WRITING;
2815+
AFL_ENSURE(!TxId || NeedToFlushBeforeCommit); // TxId => NeedToFlushBeforeCommit
2816+
NeedToFlushBeforeCommit = false;
2817+
if (TxId) {
2818+
TxManager->StartPrepare();
2819+
Prepare(std::nullopt);
2820+
return;
2821+
}
2822+
28082823
Send<ESendingType::Tail>(ExecuterActorId, new TEvKqpBuffer::TEvResult{
28092824
BuildStats()
28102825
});
@@ -2846,8 +2861,11 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
28462861
ReplyErrorAndDieImpl(statusCode, std::move(issues));
28472862
}
28482863

2849-
void UpdateTracingState(const char* name, NWilson::TTraceId traceId) {
2850-
BufferWriteActorStateSpan = NWilson::TSpan(TWilsonKqp::BufferWriteActorState, std::move(traceId),
2864+
void UpdateTracingState(const char* name, std::optional<NWilson::TTraceId> traceId) {
2865+
if (!traceId) {
2866+
return;
2867+
}
2868+
BufferWriteActorStateSpan = NWilson::TSpan(TWilsonKqp::BufferWriteActorState, std::move(*traceId),
28512869
name, NWilson::EFlags::AUTO_END);
28522870
if (BufferWriteActorStateSpan.GetTraceId() != BufferWriteActorSpan.GetTraceId()) {
28532871
BufferWriteActorStateSpan.Link(BufferWriteActorSpan.GetTraceId());
@@ -2929,6 +2947,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
29292947

29302948
EState State;
29312949
bool HasError = false;
2950+
bool NeedToFlushBeforeCommit = false;
29322951
THashMap<TPathId, std::queue<TBufferWriteMessage>> DataQueues;
29332952

29342953
struct TAckMessage {

0 commit comments

Comments
 (0)