Skip to content

Commit 33701df

Browse files
authored
Refactor write actor buffers (#19300)
1 parent 0e700ae commit 33701df

File tree

3 files changed

+19
-22
lines changed

3 files changed

+19
-22
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -266,10 +266,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
266266

267267
if (TxManager) {
268268
TxManager->SetHasSnapshot(GetSnapshot().IsValid());
269-
270-
for (const ui64& shardId : TxManager->GetShards()) {
271-
Stats->AffectedShards.insert(shardId);
272-
}
273269
}
274270

275271
if (!BufferActorId || (ReadOnlyTx && Request.LocksOp != ELocksOp::Rollback)) {
@@ -392,6 +388,12 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
392388
}
393389
}
394390

391+
if (TxManager) {
392+
for (const ui64& shardId : TxManager->GetShards()) {
393+
Stats->AffectedShards.insert(shardId);
394+
}
395+
}
396+
395397
auto resultSize = ResponseEv->GetByteSize();
396398
if (resultSize > (int)ReplySizeLimit) {
397399
TString message;

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1495,6 +1495,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
14951495
WriteTableActor->Write(WriteToken, Batcher->Build());
14961496
if (Closed) {
14971497
WriteTableActor->Close(WriteToken);
1498+
WriteTableActor->FlushBuffers();
14981499
WriteTableActor->Close();
14991500
}
15001501
} catch (const TMemoryLimitExceededException&) {
@@ -1536,6 +1537,10 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
15361537
return;
15371538
}
15381539

1540+
if (!Closed && outOfMemory) {
1541+
WriteTableActor->FlushBuffers();
1542+
}
1543+
15391544
if (Closed || outOfMemory) {
15401545
if (!WriteTableActor->FlushToShards()) {
15411546
return;
@@ -2025,6 +2030,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
20252030
bool flushFailed = false;
20262031
ForEachWriteActor([&](TKqpTableWriteActor* actor, const TActorId) {
20272032
if (!flushFailed && actor->IsReady()) {
2033+
actor->FlushBuffers();
20282034
if (!actor->FlushToShards()) {
20292035
flushFailed = true;
20302036
}

ydb/core/kqp/runtime/kqp_write_table.cpp

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1385,8 +1385,6 @@ class TShardedWriteController : public IShardedWriteController {
13851385
for (const auto& [token, writeInfo] : WriteInfos) {
13861386
if (writeInfo.Closed) {
13871387
Close(token);
1388-
} else {
1389-
FlushSerializer(token, GetMemory() >= Settings.MemoryLimitTotal);
13901388
}
13911389
}
13921390
}
@@ -1442,21 +1440,13 @@ class TShardedWriteController : public IShardedWriteController {
14421440
data->AttachAlloc(Alloc);
14431441
}
14441442
info.Serializer->AddData(std::move(data));
1445-
1446-
if (info.Metadata.Priority == 0) {
1447-
FlushSerializer(token, GetMemory() >= Settings.MemoryLimitTotal);
1448-
}
14491443
}
14501444

14511445
void Close(TWriteToken token) override {
14521446
auto& info = WriteInfos.at(token);
14531447
AFL_ENSURE(info.Serializer);
14541448
info.Closed = true;
14551449
info.Serializer->Close();
1456-
if (info.Metadata.Priority == 0) {
1457-
FlushSerializer(token, true);
1458-
AFL_ENSURE(info.Serializer->IsFinished());
1459-
}
14601450
}
14611451

14621452
void CleanupClosedTokens() override {
@@ -1474,13 +1464,8 @@ class TShardedWriteController : public IShardedWriteController {
14741464
void FlushBuffers() override {
14751465
TVector<TWriteToken> writeTokensFoFlush;
14761466
for (const auto& [token, writeInfo] : WriteInfos) {
1477-
AFL_ENSURE(writeInfo.Closed);
1478-
if (writeInfo.Metadata.Priority != 0) {
1479-
if (!writeInfo.Serializer->IsFinished()) {
1480-
writeTokensFoFlush.push_back(token);
1481-
}
1482-
} else {
1483-
AFL_ENSURE(writeInfo.Serializer->IsFinished());
1467+
if ((writeInfo.Metadata.Priority == 0 || writeInfo.Closed) && !writeInfo.Serializer->IsFinished()) {
1468+
writeTokensFoFlush.push_back(token);
14841469
}
14851470
}
14861471

@@ -1495,7 +1480,11 @@ class TShardedWriteController : public IShardedWriteController {
14951480

14961481
for (const TWriteToken token : writeTokensFoFlush) {
14971482
FlushSerializer(token, true);
1498-
AFL_ENSURE(WriteInfos.at(token).Serializer->IsFinished());
1483+
const auto& writeInfo = WriteInfos.at(token);
1484+
if (writeInfo.Metadata.Priority != 0) {
1485+
AFL_ENSURE(writeInfo.Closed);
1486+
AFL_ENSURE(writeInfo.Serializer->IsFinished());
1487+
}
14991488
}
15001489
}
15011490

0 commit comments

Comments
 (0)