Skip to content

Commit 75782f3

Browse files
nikvas0blinkov
authored andcommitted
Fix lost data for stream EvWrite (#14538)
1 parent aa3fafd commit 75782f3

File tree

1 file changed

+26
-23
lines changed

1 file changed

+26
-23
lines changed

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1649,14 +1649,11 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
16491649
writeInfo.WriteTableActor->Close(message.Token.Cookie);
16501650
}
16511651

1652-
if (!message.Close) {
1653-
YQL_ENSURE(false);
1654-
AckQueue.push(TAckMessage{
1655-
.ForwardActorId = message.From,
1656-
.Token = message.Token,
1657-
.DataSize = 0,
1658-
});
1659-
}
1652+
AckQueue.push(TAckMessage{
1653+
.ForwardActorId = message.From,
1654+
.Token = message.Token,
1655+
.DataSize = 0,
1656+
});
16601657

16611658
queue.pop();
16621659
}
@@ -2493,6 +2490,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
24932490
}
24942491
Y_UNUSED(dataSize);
24952492
if (TxManager->ConsumeCommitResult(shardId)) {
2493+
CA_LOG_D("Committed TxId=" << TxId.value_or(0));
24962494
OnOperationFinished(Counters->BufferActorCommitLatencyHistogram);
24972495
State = EState::FINISHED;
24982496
Send<ESendingType::Tail>(ExecuterActorId, new TEvKqpBuffer::TEvResult{
@@ -2710,15 +2708,29 @@ class TKqpForwardWriteActor : public TActorBootstrapped<TKqpForwardWriteActor>,
27102708

27112709
void Handle(TEvBufferWriteResult::TPtr& result) {
27122710
CA_LOG_D("TKqpForwardWriteActor recieve EvBufferWriteResult from " << BufferActorId);
2711+
InFlight = false;
2712+
2713+
EgressStats.Bytes += DataSize;
2714+
EgressStats.Chunks++;
2715+
EgressStats.Splits++;
2716+
EgressStats.Resume();
2717+
2718+
Counters->ForwardActorWritesSizeHistogram->Collect(DataSize);
27132719

27142720
WriteToken = result->Get()->Token;
27152721
DataSize = 0;
27162722

2717-
CA_LOG_D("Resume with freeSpace=" << GetFreeSpace());
2718-
Callbacks->ResumeExecution();
2723+
if (Closed) {
2724+
CA_LOG_D("Finished");
2725+
Callbacks->OnAsyncOutputFinished(GetOutputIndex());
2726+
} else {
2727+
CA_LOG_D("Resume with freeSpace=" << GetFreeSpace());
2728+
Callbacks->ResumeExecution();
2729+
}
27192730
}
27202731

27212732
void WriteToBuffer() {
2733+
InFlight = true;
27222734
auto ev = std::make_unique<TEvBufferWrite>();
27232735

27242736
ev->Data = Batcher->Build();
@@ -2762,18 +2774,6 @@ class TKqpForwardWriteActor : public TActorBootstrapped<TKqpForwardWriteActor>,
27622774

27632775
CA_LOG_D("Send data=" << DataSize << ", closed=" << Closed << ", bufferActorId=" << BufferActorId);
27642776
AFL_ENSURE(Send(BufferActorId, ev.release()));
2765-
2766-
EgressStats.Bytes += DataSize;
2767-
EgressStats.Chunks++;
2768-
EgressStats.Splits++;
2769-
EgressStats.Resume();
2770-
2771-
Counters->ForwardActorWritesSizeHistogram->Collect(DataSize);
2772-
2773-
if (Closed) {
2774-
CA_LOG_D("Finished");
2775-
Callbacks->OnAsyncOutputFinished(GetOutputIndex());
2776-
}
27772777
}
27782778

27792779
void CommitState(const NYql::NDqProto::TCheckpoint&) final {};
@@ -2788,7 +2788,9 @@ class TKqpForwardWriteActor : public TActorBootstrapped<TKqpForwardWriteActor>,
27882788
}
27892789

27902790
i64 GetFreeSpace() const final {
2791-
return MessageSettings.MaxForwardedSize - DataSize;
2791+
return InFlight
2792+
? std::numeric_limits<i64>::min()
2793+
: MessageSettings.MaxForwardedSize - DataSize;
27922794
}
27932795

27942796
TMaybe<google::protobuf::Any> ExtraData() override {
@@ -2840,6 +2842,7 @@ class TKqpForwardWriteActor : public TActorBootstrapped<TKqpForwardWriteActor>,
28402842

28412843
i64 DataSize = 0;
28422844
bool Closed = false;
2845+
bool InFlight = false;
28432846

28442847
const ui64 TxId;
28452848
const TTableId TableId;

0 commit comments

Comments
 (0)